This is an automated email from the ASF dual-hosted git repository. apilloud pushed a commit to branch num in repository https://gitbox.apache.org/repos/asf/beam.git
commit a3b0f9a4456eaf750053f19b075ac317531bb4c4 Author: Andrew Pilloud <[email protected]> AuthorDate: Wed Jan 8 15:14:59 2020 -0800 [BEAM-8630] Use column numbers for BeamZetaSqlCalRel --- .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 33 +++++++++------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java index 330fb2d..153df25 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -67,13 +66,14 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { private static final SqlDialect DIALECT = BeamBigQuerySqlDialect.DEFAULT; private final SqlImplementor.Context context; + private static String columnName(int i) { + return "_" + i; + } + public BeamZetaSqlCalcRel( RelOptCluster cluster, RelTraitSet traits, RelNode input, RexProgram program) { super(cluster, traits, input, program); - final IntFunction<SqlNode> fn = - i -> - new SqlIdentifier( - getProgram().getInputRowType().getFieldList().get(i).getName(), SqlParserPos.ZERO); + final IntFunction<SqlNode> fn = i -> new SqlIdentifier(columnName(i), SqlParserPos.ZERO); context = new BeamSqlUnparseContext(fn); } @@ -146,20 +146,21 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { @Setup public void setup() { AnalyzerOptions options = SqlAnalyzer.initAnalyzerOptions(); - for (Field field : inputSchema.getFields()) { + for (int i = 0; i < inputSchema.getFieldCount(); i++) { options.addExpressionColumn( - sanitize(field.getName()), ZetaSqlUtils.beamFieldTypeToZetaSqlType(field.getType())); + columnName(i), + ZetaSqlUtils.beamFieldTypeToZetaSqlType(inputSchema.getField(i).getType())); } // TODO[BEAM-8630]: use a single PreparedExpression for all condition and projects projectExps = new ArrayList<>(); for (String project : projects) { - PreparedExpression projectExp = new PreparedExpression(sanitize(project)); + PreparedExpression projectExp = new PreparedExpression(project); projectExp.prepare(options); projectExps.add(projectExp); } if (condition != null) { - conditionExp = new PreparedExpression(sanitize(condition)); + conditionExp = new PreparedExpression(condition); conditionExp.prepare(options); } } @@ -168,10 +169,11 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { public void processElement(ProcessContext c) { Map<String, Value> columns = new HashMap<>(); Row row = c.element(); - for (Field field : inputSchema.getFields()) { + for (int i = 0; i < inputSchema.getFieldCount(); i++) { columns.put( - sanitize(field.getName()), - ZetaSqlUtils.javaObjectToZetaSqlValue(row.getValue(field.getName()), field.getType())); + columnName(i), + ZetaSqlUtils.javaObjectToZetaSqlValue( + row.getValue(i), inputSchema.getField(i).getType())); } // TODO[BEAM-8630]: support parameters in expression evaluation @@ -201,12 +203,5 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { conditionExp.close(); } } - - // Replaces "$" with "_" because "$" is not allowed in a valid ZetaSQL identifier - // (ZetaSQL identifier syntax: [A-Za-z_][A-Za-z_0-9]*) - // TODO[BEAM-8630]: check if this is sufficient and correct, or even better fix this in Calcite - private static String sanitize(String identifier) { - return identifier.replaceAll("\\$", "_"); - } } }
