[ 
https://issues.apache.org/jira/browse/BEAM-5112?focusedWorklogId=179566&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179566
 ]

ASF GitHub Bot logged work on BEAM-5112:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Dec/18 20:51
            Start Date: 28/Dec/18 20:51
    Worklog Time Spent: 10m 
      Work Description: apilloud commented on pull request #6417: [BEAM-5112] 
Use Calcite codegen to implement BeamCalcRel
URL: https://github.com/apache/beam/pull/6417#discussion_r244405554
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 ##########
 @@ -68,15 +121,90 @@ public Calc copy(RelTraitSet traitSet, RelNode input, 
RexProgram program) {
           BeamCalcRel.class.getSimpleName(),
           pinput);
       PCollection<Row> upstream = pinput.get(0);
+      Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+      final SqlConformance conformance = SqlConformanceEnum.MYSQL_5;
+      final JavaTypeFactory typeFactory =
+          new JavaTypeFactoryImpl(BeamRelDataTypeSystem.INSTANCE) {
+            @Override
+            public Type getJavaClass(RelDataType type) {
+              if (type instanceof BasicSqlType || type instanceof 
IntervalSqlType) {
+                if (type.getSqlTypeName() == SqlTypeName.FLOAT) {
+                  return type.isNullable() ? Float.class : float.class;
+                }
+              }
+              return super.getJavaClass(type);
+            }
+          };
+      final BlockBuilder builder = new BlockBuilder();
+
+      final PhysType physType =
+          PhysTypeImpl.of(typeFactory, getRowType(), JavaRowFormat.ARRAY, 
false);
+
+      Expression input =
+          Expressions.convert_(Expressions.call(processContextParam, 
"element"), Row.class);
+
+      final RexBuilder rexBuilder = getCluster().getRexBuilder();
+      final RelMetadataQuery mq = RelMetadataQuery.instance();
+      final RelOptPredicateList predicates = 
mq.getPulledUpPredicates(getInput());
+      final RexSimplify simplify = new RexSimplify(rexBuilder, predicates, 
false, RexUtil.EXECUTOR);
+      final RexProgram program = 
BeamCalcRel.this.program.normalize(rexBuilder, simplify);
 
-      BeamSqlExpressionExecutor executor = new 
BeamSqlFnExecutor(BeamCalcRel.this.getProgram());
+      Expression condition =
+          RexToLixTranslator.translateCondition(
+              program,
+              typeFactory,
+              builder,
+              new InputGetterImpl(input, upstream.getSchema()),
+              null,
+              conformance);
 
-      Schema schema = CalciteUtils.toSchema(rowType);
-      PCollection<Row> projectStream =
-          upstream
-              .apply(ParDo.of(new CalcFn(executor, 
CalciteUtils.toSchema(rowType))))
-              .setRowSchema(schema);
-      projectStream.setRowSchema(CalciteUtils.toSchema(getRowType()));
+      List<Expression> expressions =
+          RexToLixTranslator.translateProjects(
+              program,
+              typeFactory,
+              conformance,
+              builder,
+              physType,
+              DataContext.ROOT,
+              new InputGetterImpl(input, upstream.getSchema()),
+              null);
+
+      // output = Row.withSchema(outputSchema)
+      Expression output = Expressions.call(Row.class, "withSchema", 
outputSchemaParam);
+      Method addValue = Types.lookupMethod(Row.Builder.class, "addValue", 
Object.class);
+
+      for (int index = 0; index < expressions.size(); index++) {
+        Expression value = expressions.get(index);
+        FieldType toType = outputSchema.getField(index).getType();
+
+        // .addValue(value)
+        output = Expressions.call(output, addValue, castOutput(value, toType));
+      }
+
+      // .build();
+      output = Expressions.call(output, "build");
+
+      // if (condition) {
+      //   c.output(output);
+      // }
+      builder.add(
 
 Review comment:
   An example of the generated code is in the comment immediately above this 
block.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 179566)
    Time Spent: 3h 50m  (was: 3h 40m)

> Investigate if Calcite can generate functions that we need
> ----------------------------------------------------------
>
>                 Key: BEAM-5112
>                 URL: https://issues.apache.org/jira/browse/BEAM-5112
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to