This is an automated email from the ASF dual-hosted git repository. vladimirsitnikov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push: new 0fd4ca8 [CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson) 0fd4ca8 is described below commit 0fd4ca89d0cc8fa4ea8fd17c1657b28a3d8cf533 Author: Paul Jackson <paul.jack...@stardog.com> AuthorDate: Mon Jan 22 13:21:59 2018 -0500 [CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson) fixes #614 --- .../org/apache/calcite/plan/RelOptCluster.java | 2 +- .../rel/metadata/JaninoRelMetadataProvider.java | 38 ++++++++-------- .../org/apache/calcite/test/RelBuilderTest.java | 53 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java b/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java index da6b8eb..a078354 100644 --- a/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java +++ b/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java @@ -147,7 +147,7 @@ public class RelOptCluster { * If you have a {@link RelOptRuleCall} available, * for example if you are in a {@link RelOptRule#onMatch(RelOptRuleCall)} * method, then use {@link RelOptRuleCall#getMetadataQuery()} instead. */ - public RelMetadataQuery getMetadataQuery() { + public synchronized RelMetadataQuery getMetadataQuery() { if (mq == null) { mq = RelMetadataQuery.instance(); } diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java old mode 100644 new mode 100755 index e206ff3..888f10d --- a/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java +++ b/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java @@ -269,44 +269,46 @@ public class JaninoRelMetadataProvider implements RelMetadataProvider { buff.append(", r"); safeArgList(buff, method.e) .append(");\n") - .append(" final Object v = mq.map.get(key);\n") - .append(" if (v != null) {\n") - .append(" if (v == ") + .append(" synchronized (mq.map) {") + .append(" final Object v = mq.map.get(key);\n") + .append(" if (v != null) {\n") + .append(" if (v == ") .append(NullSentinel.class.getName()) .append(".ACTIVE) {\n") - .append(" throw ") + .append(" throw ") .append(CyclicMetadataException.class.getName()) .append(".INSTANCE;\n") - .append(" }\n") - .append(" if (v == ") + .append(" }\n") + .append(" if (v == ") .append(NullSentinel.class.getName()) .append(".INSTANCE) {\n") - .append(" return null;\n") - .append(" }\n") - .append(" return (") + .append(" return null;\n") + .append(" }\n") + .append(" return (") .append(method.e.getReturnType().getName()) .append(") v;\n") - .append(" }\n") - .append(" mq.map.put(key,") + .append(" }\n") + .append(" mq.map.put(key,") .append(NullSentinel.class.getName()) .append(".ACTIVE);\n") - .append(" try {\n") - .append(" final ") + .append(" try {\n") + .append(" final ") .append(method.e.getReturnType().getName()) .append(" x = ") .append(method.e.getName()) .append("_(r, mq"); argList(buff, method.e) .append(");\n") - .append(" mq.map.put(key, ") + .append(" mq.map.put(key, ") .append(NullSentinel.class.getName()) .append(".mask(x));\n") - .append(" return x;\n") - .append(" } catch (") + .append(" return x;\n") + .append(" } catch (") .append(Exception.class.getName()) .append(" e) {\n") - .append(" mq.map.remove(key);\n") - .append(" throw e;\n") + .append(" mq.map.remove(key);\n") + .append(" throw e;\n") + .append(" }\n") .append(" }\n") .append(" }\n") .append("\n") diff --git a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java old mode 100644 new mode 100755 index d345241..ca02f4b --- a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java +++ b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java @@ -69,11 +69,18 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static org.apache.calcite.test.Matchers.hasTree; @@ -1984,6 +1991,52 @@ public class RelBuilderTest { assertThat(root, hasTree(expected)); } + @Test public void testConcurrentProject() { + // On my quad core hyperthreaded i7, 2 threads passed 10 times in 10 attempts, + // 4 threads passed 2/10, and 8 threads passed 0/0. + final int numThreads = 16; + ExecutorService service = Executors.newFixedThreadPool(numThreads); + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + + // When synchronized(mq.map) is not there, the error happens at iteration 0 or 1 + for (int iteration = 0; iteration < 10; iteration++) { + barrier.reset(); + final FrameworkConfig config = config().build(); + final RelBuilder builder = RelBuilder.create(config); + final RelNode r = builder.scan("EMP").build(); + List<Future<RelNode>> futures = new ArrayList<>(numThreads); + final Callable<RelNode> callable = () -> { + final RelBuilder builder1 = RelBuilder.create(config); + builder1.push(r); + final List<RexNode> fields = Lists.newArrayList((RexNode) builder1.field("ENAME")); + final List<String> fieldNames = Lists.newArrayList("F1"); + barrier.await(); + builder1.project(fields, fieldNames, true); + return builder1.build(); + }; + for (int i = 0; i < numThreads; i++) { + futures.add(service.submit(callable)); + } + for (int i = 0; i < futures.size(); i++) { + Future<RelNode> f = futures.get(i); + final RelNode node; + try { + node = f.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException( + "Unable to build relation concurrently: " + e.getMessage() + + ", exception at iteration " + iteration, e.getCause()); + } + String expected = "LogicalProject(F1=[$1])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + assertThat("Plan for thread #" + i, node, hasTree(expected)); + } + } + } + /** Tests that a sort on a field followed by a limit gives the same * effect as calling sortLimit. *