Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r154632607 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java --- @@ -27,50 +30,46 @@ import org.apache.calcite.rex.RexNode; import org.apache.storm.sql.planner.StormRelUtils; import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase; -import org.apache.storm.sql.planner.trident.TridentPlanCreator; -import org.apache.storm.sql.runtime.ISqlTridentDataSource; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.fluent.IAggregatableStream; -import org.apache.storm.tuple.Fields; +import org.apache.storm.sql.planner.streams.StreamsPlanCreator; +import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction; +import org.apache.storm.streams.Stream; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Values; -import java.util.List; +public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel { + private final int primaryKeyIndex; -public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel { - public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, + RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, + boolean flattened, int primaryKeyIndex) { super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened); + this.primaryKeyIndex = primaryKeyIndex; } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), - sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), + sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), + primaryKeyIndex); } @Override - public void tridentPlan(TridentPlanCreator planCreator) throws Exception { + public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { // SingleRel RelNode input = getInput(); - StormRelUtils.getStormRelInput(input).tridentPlan(planCreator); - Stream inputStream = planCreator.pop().toStream(); - - String stageName = StormRelUtils.getStageName(this); + StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); + Stream<Values> inputStream = planCreator.pop(); Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported."); - List<String> inputFields = this.input.getRowType().getFieldNames(); - List<String> outputFields = getRowType().getFieldNames(); + // Calcite ensures that the value is structurized to the table definition --- End diff -- To elaborate, if table BAR is defined as `ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER` and query like `INSERT INTO BAR SELECT NAME, ID FROM FOO` is executed, Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT.
---