Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2443#discussion_r154632607
  
    --- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
 ---
    @@ -27,50 +30,46 @@
     import org.apache.calcite.rex.RexNode;
     import org.apache.storm.sql.planner.StormRelUtils;
     import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
    -import org.apache.storm.sql.planner.trident.TridentPlanCreator;
    -import org.apache.storm.sql.runtime.ISqlTridentDataSource;
    -import org.apache.storm.trident.Stream;
    -import org.apache.storm.trident.fluent.IAggregatableStream;
    -import org.apache.storm.tuple.Fields;
    +import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
    +import 
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
    +import org.apache.storm.streams.Stream;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.tuple.Values;
     
    -import java.util.List;
    +public class StreamsStreamInsertRel extends StormStreamInsertRelBase 
implements StreamsRel {
    +    private final int primaryKeyIndex;
     
    -public class TridentStreamInsertRel extends StormStreamInsertRelBase 
implements TridentRel {
    -    public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, 
Operation operation, List<String> updateColumnList, List<RexNode> 
sourceExpressionList, boolean flattened) {
    +    public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
    +                                  RelNode child, Operation operation, 
List<String> updateColumnList, List<RexNode> sourceExpressionList,
    +                                  boolean flattened, int primaryKeyIndex) {
             super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, sourceExpressionList, flattened);
    +        this.primaryKeyIndex = primaryKeyIndex;
         }
     
         @Override
         public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
    -        return new TridentStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
    -                sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened());
    +        return new StreamsStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
    +                sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened(),
    +                primaryKeyIndex);
         }
     
         @Override
    -    public void tridentPlan(TridentPlanCreator planCreator) throws 
Exception {
    +    public void streamsPlan(StreamsPlanCreator planCreator) throws 
Exception {
             // SingleRel
             RelNode input = getInput();
    -        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
    -        Stream inputStream = planCreator.pop().toStream();
    -
    -        String stageName = StormRelUtils.getStageName(this);
    +        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
    +        Stream<Values> inputStream = planCreator.pop();
     
             Preconditions.checkArgument(isInsert(), "Only INSERT statement is 
supported.");
     
    -        List<String> inputFields = this.input.getRowType().getFieldNames();
    -        List<String> outputFields = getRowType().getFieldNames();
    +        // Calcite ensures that the value is structurized to the table 
definition
    --- End diff --
    
    To elaborate, if table BAR is defined as `ID INTEGER PK, NAME VARCHAR, 
DEPTID INTEGER` and query like `INSERT INTO BAR SELECT NAME, ID FROM FOO` is 
executed, Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the 
value before INSERT.


---

Reply via email to