[
https://issues.apache.org/jira/browse/BEAM-5112?focusedWorklogId=179569&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179569
]
ASF GitHub Bot logged work on BEAM-5112:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Dec/18 21:08
Start Date: 28/Dec/18 21:08
Worklog Time Spent: 10m
Work Description: apilloud commented on pull request #6417: [BEAM-5112]
Use Calcite codegen to implement BeamCalcRel
URL: https://github.com/apache/beam/pull/6417#discussion_r244408100
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -96,43 +224,243 @@ public boolean isInputSortRelAndLimitOnly() {
/** {@code CalcFn} is the executor for a {@link BeamCalcRel} step. */
private static class CalcFn extends DoFn<Row, Row> {
- private BeamSqlExpressionExecutor executor;
- private Schema outputSchema;
+ private final String processElementBlock;
+ private final Schema outputSchema;
+ private transient @Nullable ScriptEvaluator se = null;
- public CalcFn(BeamSqlExpressionExecutor executor, Schema outputSchema) {
- super();
- this.executor = executor;
+ public CalcFn(String processElementBlock, Schema outputSchema) {
+ this.processElementBlock = processElementBlock;
this.outputSchema = outputSchema;
}
+ ScriptEvaluator compile() {
+ ScriptEvaluator se = new ScriptEvaluator();
+ se.setParameters(
+ new String[] {outputSchemaParam.name, processContextParam.name,
DataContext.ROOT.name},
+ new Class[] {
+ (Class) outputSchemaParam.getType(),
+ (Class) processContextParam.getType(),
+ (Class) DataContext.ROOT.getType()
+ });
+ try {
+ se.cook(processElementBlock);
+ } catch (CompileException e) {
+ throw new RuntimeException("Could not compile CalcFn: " +
processElementBlock, e);
+ }
+ return se;
+ }
+
@Setup
public void setup() {
- executor.prepare();
+ this.se = compile();
}
@ProcessElement
public void processElement(ProcessContext c) {
- Row inputRow = c.element();
- @Nullable
- List<Object> rawResultValues =
- executor.execute(inputRow, null,
BeamSqlExpressionEnvironments.forRow(inputRow, null));
-
- if (rawResultValues != null) {
- List<Object> castResultValues =
- IntStream.range(0, outputSchema.getFieldCount())
- .mapToObj(i -> castField(rawResultValues, i))
- .collect(toList());
-
c.output(Row.withSchema(outputSchema).addValues(castResultValues).build());
+ assert se != null;
+ try {
+ se.evaluate(new Object[] {outputSchema, c, CONTEXT_INSTANCE});
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(
+ "CalcFn failed to evaluate: " + processElementBlock, e.getCause());
+ }
+ }
+ }
+
+ private static final Map<TypeName, Type> rawTypeMap =
+ ImmutableMap.<TypeName, Type>builder()
+ .put(TypeName.BYTE, Byte.class)
+ .put(TypeName.INT16, Short.class)
+ .put(TypeName.INT32, Integer.class)
+ .put(TypeName.INT64, Long.class)
+ .put(TypeName.FLOAT, Float.class)
+ .put(TypeName.DOUBLE, Double.class)
+ .build();
+
+ private Expression castOutput(Expression value, FieldType toType) {
+ if (value.getType() == Object.class) {
+ // fast copy path, just pass object through
+ return value;
+ } else if (toType.getTypeName().isDateType()
+ && value.getType() instanceof Class
+ && !Types.isAssignableFrom(ReadableInstant.class, (Class)
value.getType())) {
+ Expression valueDateTime = value;
+ if (toType.getMetadata() == null) {
+ if (value.getType() == java.sql.Timestamp.class) {
+ valueDateTime =
Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG.method, valueDateTime);
+ }
+ } else if (Arrays.equals(toType.getMetadata(),
CalciteUtils.TIME.getMetadata())) {
+ if (value.getType() == java.sql.Time.class) {
+ valueDateTime = Expressions.call(BuiltInMethod.TIME_TO_INT.method,
valueDateTime);
+ }
+ } else if (Arrays.equals(toType.getMetadata(),
CalciteUtils.DATE.getMetadata())) {
+ if (value.getType() == java.sql.Date.class) {
+ valueDateTime = Expressions.call(BuiltInMethod.DATE_TO_INT.method,
valueDateTime);
+ }
+ valueDateTime = Expressions.multiply(valueDateTime,
Expressions.constant(MILLIS_PER_DAY));
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown DateTime type " + new String(toType.getMetadata(),
UTF_8));
+ }
+ valueDateTime =
+ Expressions.new_(
+ DateTime.class,
+ valueDateTime,
+ Expressions.parameter(DateTimeZone.class,
"org.joda.time.DateTimeZone.UTC"));
+
+ if (((Class) value.getType()).isPrimitive()) {
+ return valueDateTime;
+ } else {
+ return Expressions.condition(
+ Expressions.equal(value, Expressions.constant(null)),
+ Expressions.constant(null),
+ valueDateTime);
+ }
+ } else if (toType.getTypeName() == TypeName.DECIMAL
+ && value.getType() instanceof Class
+ && !Types.isAssignableFrom(BigDecimal.class, (Class) value.getType()))
{
+ return Expressions.new_(BigDecimal.class, value);
+
+ } else if (value.getType() instanceof Class
+ && (((Class) value.getType()).isPrimitive()
+ || Types.isAssignableFrom(Number.class, (Class) value.getType())))
{
+ Type rawType = rawTypeMap.get(toType.getTypeName());
+ if (rawType != null) {
+ return Types.castIfNecessary(rawType, value);
+ }
+ }
+ return value;
+ }
+
+ private static class InputGetterImpl implements
RexToLixTranslator.InputGetter {
+ private static final Map<TypeName, String> typeGetterMap =
+ ImmutableMap.<TypeName, String>builder()
+ .put(TypeName.BYTE, "getByte")
+ .put(TypeName.BYTES, "getBytes")
+ .put(TypeName.INT16, "getInt16")
+ .put(TypeName.INT32, "getInt32")
+ .put(TypeName.INT64, "getInt64")
+ .put(TypeName.DECIMAL, "getDecimal")
+ .put(TypeName.FLOAT, "getFloat")
+ .put(TypeName.DOUBLE, "getDouble")
+ .put(TypeName.STRING, "getString")
+ .put(TypeName.DATETIME, "getDateTime")
+ .put(TypeName.BOOLEAN, "getBoolean")
+ .put(TypeName.MAP, "getMap")
+ .put(TypeName.ARRAY, "getArray")
+ .put(TypeName.ROW, "getRow")
+ .build();
+
+ private final Expression input;
+ private final Schema inputSchema;
+
+ private InputGetterImpl(Expression input, Schema inputSchema) {
+ this.input = input;
+ this.inputSchema = inputSchema;
+ }
+
+ @Override
+ public Expression field(BlockBuilder list, int index, Type storageType) {
+ if (index >= inputSchema.getFieldCount() || index < 0) {
+ throw new IllegalArgumentException("Unable to find field #" + index);
+ }
+
+ final Expression expression = list.append("current", input);
+ if (storageType == Object.class) {
+ return Expressions.convert_(
+ Expressions.call(expression, "getValue",
Expressions.constant(index)), Object.class);
+ }
+ FieldType fromType = inputSchema.getField(index).getType();
+ String getter = typeGetterMap.get(fromType.getTypeName());
+ if (getter == null) {
+ throw new IllegalArgumentException("Unable to get " +
fromType.getTypeName());
+ }
+
+ Expression field = Expressions.call(expression, getter,
Expressions.constant(index));
+ if (fromType.getTypeName().isDateType()) {
+ field = Expressions.call(field, "getMillis");
+ if (Arrays.equals(fromType.getMetadata(),
CalciteUtils.TIME.getMetadata())) {
+ field = Expressions.convert_(field, int.class);
+ } else if (Arrays.equals(fromType.getMetadata(),
CalciteUtils.DATE.getMetadata())) {
+ field =
+ Expressions.convert_(
+ Expressions.modulo(field,
Expressions.constant(MILLIS_PER_DAY)), int.class);
+ } else if (fromType.getMetadata() != null) {
+ throw new IllegalArgumentException(
+ "Unknown DateTime type " + new String(fromType.getMetadata(),
UTF_8));
+ }
+ } else if (fromType.getTypeName().isCompositeType()
+ || (fromType.getTypeName().isCollectionType()
+ &&
fromType.getCollectionElementType().getTypeName().isCompositeType())) {
+ field = Expressions.call(WrappedList.class, "of", field);
+ }
+ return field;
+ }
+ }
+
+ private static final DataContext CONTEXT_INSTANCE = new SlimDataContext();
+
+ private static class SlimDataContext implements DataContext {
+ @Override
+ public SchemaPlus getRootSchema() {
+ return null;
+ }
+
+ @Override
+ public JavaTypeFactory getTypeFactory() {
+ return null;
+ }
+
+ @Override
+ public QueryProvider getQueryProvider() {
+ return null;
+ }
+
+ @Override
+ public Object get(String name) {
Review comment:
Added. `/* DataContext.get is used to fetch "global" state inside the
generated code */`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 179569)
Time Spent: 4h 20m (was: 4h 10m)
> Investigate if Calcite can generate functions that we need
> ----------------------------------------------------------
>
> Key: BEAM-5112
> URL: https://issues.apache.org/jira/browse/BEAM-5112
> Project: Beam
> Issue Type: Sub-task
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Andrew Pilloud
> Priority: Major
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)