[ 
https://issues.apache.org/jira/browse/BEAM-4562?focusedWorklogId=121770&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121770
 ]

ASF GitHub Bot logged work on BEAM-4562:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Jul/18 05:45
            Start Date: 11/Jul/18 05:45
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #5918: BEAM-4562: [SQL] 
Fix INSERT VALUES in JDBC
URL: https://github.com/apache/beam/pull/5918
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 8007d1eae92..fd2eddf171d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -91,6 +91,7 @@ public TableModify toModificationRel(
         updateColumnList,
         sourceExpressionList,
         flattened,
-        beamTable);
+        beamTable,
+        pipelineOptions);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index 75a2e901eba..4b66e93d514 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -21,12 +21,14 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.ReleaseInfo;
@@ -37,7 +39,12 @@
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.Driver;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CalcRemoveRule;
+import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.RuleSet;
 
 /**
  * Calcite JDBC driver with Beam defaults.
@@ -73,6 +80,21 @@
     } finally {
       Thread.currentThread().setContextClassLoader(origLoader);
     }
+    // inject beam rules into planner
+    Hook.PLANNER.addThread(
+        new Function<RelOptPlanner, Void>() {
+          @Override
+          public Void apply(RelOptPlanner planner) {
+            for (RuleSet ruleSet : BeamRuleSets.getRuleSets()) {
+              for (RelOptRule rule : ruleSet) {
+                planner.addRule(rule);
+              }
+            }
+            planner.removeRule(CalcRemoveRule.INSTANCE);
+            return null;
+          }
+        });
+    // register JDBC driver
     INSTANCE.register();
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 350bccfa02b..65c97c4f88d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -41,6 +42,7 @@
     implements BeamRelNode, RelStructuredTypeFlattener.SelfFlatteningRel {
 
   private final BeamSqlTable sqlTable;
+  private final Map<String, String> pipelineOptions;
   private boolean isFlattening = false;
 
   public BeamIOSinkRel(
@@ -52,7 +54,8 @@ public BeamIOSinkRel(
       List<String> updateColumnList,
       List<RexNode> sourceExpressionList,
       boolean flattened,
-      BeamSqlTable sqlTable) {
+      BeamSqlTable sqlTable,
+      Map<String, String> pipelineOptions) {
     super(
         cluster,
         cluster.traitSetOf(BeamLogicalConvention.INSTANCE),
@@ -64,6 +67,7 @@ public BeamIOSinkRel(
         sourceExpressionList,
         flattened);
     this.sqlTable = sqlTable;
+    this.pipelineOptions = pipelineOptions;
   }
 
   @Override
@@ -79,7 +83,8 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> 
inputs) {
             getUpdateColumnList(),
             getSourceExpressionList(),
             flattened,
-            sqlTable);
+            sqlTable,
+            pipelineOptions);
     newRel.traitSet = traitSet;
     return newRel;
   }
@@ -120,4 +125,9 @@ public void register(RelOptPlanner planner) {
       return input;
     }
   }
+
+  @Override
+  public Map<String, String> getPipelineOptions() {
+    return pipelineOptions;
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
index 4895962b161..fe0e1df5062 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
@@ -17,16 +17,12 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.rules.CalcRemoveRule;
-import org.apache.calcite.tools.RuleSet;
 
 /** Convertion for Beam SQL. */
 public enum BeamLogicalConvention implements Convention {
@@ -53,14 +49,7 @@ public boolean satisfies(RelTrait trait) {
   }
 
   @Override
-  public void register(RelOptPlanner planner) {
-    for (RuleSet ruleSet : BeamRuleSets.getRuleSets()) {
-      for (RelOptRule rule : ruleSet) {
-        planner.addRule(rule);
-      }
-    }
-    planner.removeRule(CalcRemoveRule.INSTANCE);
-  }
+  public void register(RelOptPlanner planner) {}
 
   @Override
   public String toString() {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index b245c206a27..bd5aecb04a4 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -136,7 +136,8 @@ public void testToEnumerable_count() {
             null,
             null,
             false,
-            new FakeTable());
+            new FakeTable(),
+            null);
 
     Enumerable<Object> enumerable = 
BeamEnumerableConverter.toEnumerable(options, node);
     Enumerator<Object> enumerator = enumerable.enumerator();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 121770)
    Time Spent: 50m  (was: 40m)

> [SQL] Fix INSERT VALUES in JdbcDriver 
> --------------------------------------
>
>                 Key: BEAM-4562
>                 URL: https://issues.apache.org/jira/browse/BEAM-4562
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Executing INSERT VALUES against JdbcDriver fails. Executing similar 
> statements against BeamSqlEnv works fine. Example:
> {code:java}
>     TestTableProvider tableProvider = new TestTableProvider();
>     Connection connection = JdbcDriver.connect(tableProvider);
>     connection
>         .createStatement()
>         .executeUpdate("CREATE TABLE person (id BIGINT, name VARCHAR) TYPE 
> 'test'");
>     connection.createStatement().executeQuery("INSERT INTO person VALUES(3, 
> 'yyy')");
> {code}
>  Output:
> {code}
> java.sql.SQLException: Error while executing SQL "INSERT INTO person 
> VALUES(3, 'yyy')": Node [rel#9:Subset#1.ENUMERABLE.[]] could not be 
> implemented; planner state:
> Root: rel#9:Subset#1.ENUMERABLE.[]
> Original rel:
> BeamIOSinkRel(subset=[rel#9:Subset#1.ENUMERABLE.[]], table=[[beam, person]], 
> operation=[INSERT], flattened=[false]): rowcount = 1.0, cumulative cost = 
> {1.0 rows, 0.0 cpu, 0.0 io}, id = 6
>   LogicalValues(subset=[rel#5:Subset#0.NONE.[]], tuples=[[{ 3, 'yyy' }]]): 
> rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io}, id = 0
> Sets:
> Set#0, type: RecordType(BIGINT id, VARCHAR name)
>       rel#5:Subset#0.NONE.[], best=null, importance=0.81
>               rel#0:LogicalValues.NONE.[[0, 1], [1]](type=RecordType(BIGINT 
> id, VARCHAR name),tuples=[{ 3, 'yyy' }]), rowcount=1.0, cumulative cost={inf}
>       rel#14:Subset#0.BEAM_LOGICAL.[], best=null, importance=0.81
>       rel#20:Subset#0.ENUMERABLE.[], best=rel#19, importance=0.405
>               rel#19:EnumerableValues.ENUMERABLE.[[0, 1], 
> [1]](type=RecordType(BIGINT id, VARCHAR name),tuples=[{ 3, 'yyy' }]), 
> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io}
> Set#1, type: RecordType(BIGINT ROWCOUNT)
>       rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=0.9
>               
> rel#6:BeamIOSinkRel.BEAM_LOGICAL.[](input=rel#5:Subset#0.NONE.[],table=[beam, 
> person],operation=INSERT,flattened=false), rowcount=1.0, cumulative cost={inf}
>               
> rel#15:BeamIOSinkRel.BEAM_LOGICAL.[](input=rel#14:Subset#0.BEAM_LOGICAL.[],table=[beam,
>  person],operation=INSERT,flattened=false), rowcount=1.0, cumulative 
> cost={inf}
>       rel#9:Subset#1.ENUMERABLE.[], best=null, importance=1.0
>               
> rel#10:AbstractConverter.ENUMERABLE.[](input=rel#7:Subset#1.BEAM_LOGICAL.[],convention=ENUMERABLE,sort=[]),
>  rowcount=1.0, cumulative cost={inf}
>               
> rel#11:BeamEnumerableConverter.ENUMERABLE.[](input=rel#7:Subset#1.BEAM_LOGICAL.[]),
>  rowcount=1.0, cumulative cost={inf}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to