[
https://issues.apache.org/jira/browse/BEAM-5203?focusedWorklogId=160770&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160770
]
ASF GitHub Bot logged work on BEAM-5203:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Oct/18 18:52
Start Date: 30/Oct/18 18:52
Worklog Time Spent: 10m
Work Description: XuMingmin closed pull request #6874: [WIP] [BEAM-5203]
expose PaneInfo and BoundedWindow as UDF
URL: https://github.com/apache/beam/pull/6874
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
index ca3c072faa4..f0f04fd288a 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
/**
@@ -31,8 +32,12 @@
/** invoked before data processing. */
void prepare();
- /** apply transformation to input record {@link Row} with {@link
BoundedWindow}. */
- List<Object> execute(Row inputRow, BoundedWindow window,
BeamSqlExpressionEnvironment env);
+ /**
+ * apply transformation to input record {@link Row} with {@link
BoundedWindow} and {@link
+ * PaneInfo}.
+ */
+ List<Object> execute(
+ Row inputRow, BoundedWindow window, PaneInfo paneInfo,
BeamSqlExpressionEnvironment env);
void close();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index dec5217fa37..207187ca7f6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -90,8 +90,13 @@
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row.BeamSqlFieldAccessExpression;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf.BeamBoundedWindowUdf;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf.BeamBoundedWindowUdf.BoundedWindowFunc;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf.BeamPaneInfoUdf;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf.BeamPaneInfoUdf.PaneFunc;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
@@ -501,6 +506,28 @@ private static BeamSqlExpression
getBeamSqlExpression(RexNode rexNode) {
ret = new BeamSqlIsNotNullExpression(subExps.get(0));
break;
+ // PaneInfo and BoundedWindow
+ case "FIRST_PANE":
+ ret = new
BeamPaneInfoUdf.BeamUdfIsFirstPane(PaneFunc.valueOf(opName));
+ break;
+ case "LAST_PANE":
+ ret = new
BeamPaneInfoUdf.BeamUdfIsLastPane(PaneFunc.valueOf(opName));
+ break;
+ case "PANE_TIMING":
+ ret = new BeamPaneInfoUdf.BeamUdfPaneIndex(PaneFunc.valueOf(opName));
+ break;
+ case "PANE_INDEX":
+ ret = new
BeamPaneInfoUdf.BeamUdfPaneTiming(PaneFunc.valueOf(opName));
+ break;
+ case "WINDOW_TYPE":
+ ret = new
BeamBoundedWindowUdf.BeamUdfWindowType(BoundedWindowFunc.valueOf(opName));
+ break;
+ case "WINDOW_START":
+ ret = new
BeamBoundedWindowUdf.BeamUdfWindowStart(BoundedWindowFunc.valueOf(opName));
+ break;
+ case "WINDOW_END":
+ ret = new
BeamBoundedWindowUdf.BeamUdfWindowEnd(BoundedWindowFunc.valueOf(opName));
+ break;
default:
// handle UDF
if (((RexCall) rexNode).getOperator() instanceof
SqlUserDefinedFunction) {
@@ -531,7 +558,7 @@ public void prepare() {}
@Override
public @Nullable List<Object> execute(
- Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
+ Row inputRow, BoundedWindow window, PaneInfo paneInfo,
BeamSqlExpressionEnvironment env) {
final BeamSqlExpressionEnvironment localEnv =
env.copyWithLocalRefExprs(exprs);
@@ -540,7 +567,7 @@ public void prepare() {}
if (conditionResult) {
return projections
.stream()
- .map(project -> project.evaluate(inputRow, window,
localEnv).getValue())
+ .map(project -> project.evaluate(inputRow, window, paneInfo,
localEnv).getValue())
.collect(Collectors.toList());
} else {
return null;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
index 8bab25dc3c3..3dfa409b59d 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -21,6 +21,7 @@
import java.util.List;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -50,14 +51,24 @@ public SqlTypeName opType(int idx) {
return op(idx).getOutputType();
}
- public Object opValueEvaluated(
+ public <T> T opValueEvaluated(
+ int idx, Row row, BoundedWindow window, PaneInfo paneInfo,
BeamSqlExpressionEnvironment env) {
+ return opValueEvaluated(idx, row, window, env);
+ }
+
+ public <T> T opValueEvaluated(
int idx, Row row, BoundedWindow window, BeamSqlExpressionEnvironment
env) {
- return op(idx).evaluate(row, window, env).getValue();
+ return (T) op(idx).evaluate(row, window, env).getValue();
}
/** assertion to make sure the input and output are supported in this
expression. */
public abstract boolean accept();
+ public BeamSqlPrimitive evaluate(
+ Row inputRow, BoundedWindow window, PaneInfo paneInfo,
BeamSqlExpressionEnvironment env) {
+ return evaluate(inputRow, window, env);
+ }
+
/**
* Apply input record {@link Row} with {@link BoundedWindow} to this
expression, the output value
* is wrapped with {@link BeamSqlPrimitive}.
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamBoundedWindowUdf.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamBoundedWindowUdf.java
new file mode 100644
index 00000000000..2aa1b1ec89c
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamBoundedWindowUdf.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/** UDF to expose {@link BoundedWindow}. */
+public class BeamBoundedWindowUdf extends BeamSqlExpression {
+ /** supported window functions. */
+ public enum BoundedWindowFunc {
+ WINDOW_TYPE,
+ WINDOW_START,
+ WINDOW_END;
+ }
+
+ private BoundedWindowFunc func;
+
+ public BeamBoundedWindowUdf(BoundedWindowFunc func) {
+ this.func = func;
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(
+ Row inputRow, BoundedWindow window, PaneInfo paneInfo,
BeamSqlExpressionEnvironment env) {
+ switch (func) {
+ case WINDOW_TYPE:
+ return BeamSqlPrimitive.<String>of(
+ SqlTypeName.VARCHAR, window instanceof GlobalWindow ? "GLOBAL" :
"INTERVAL");
+ case WINDOW_START:
+ return BeamSqlPrimitive.<Date>of(
+ SqlTypeName.TIMESTAMP,
+ window instanceof GlobalWindow
+ ? window.TIMESTAMP_MIN_VALUE.toDate()
+ : ((IntervalWindow) window).start().toDate());
+ case WINDOW_END:
+ return BeamSqlPrimitive.<Date>of(
+ SqlTypeName.TIMESTAMP,
+ window instanceof GlobalWindow
+ ? window.TIMESTAMP_MAX_VALUE.toDate()
+ : ((IntervalWindow) window).end().toDate());
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(
+ Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
+ throw new IllegalStateException();
+ }
+
+ /** UDF instance for WINDOW_TYPE. */
+ public static class BeamUdfWindowType extends BeamBoundedWindowUdf
implements BeamSqlUdf {
+
+ public BeamUdfWindowType(BoundedWindowFunc func) {
+ super(func);
+ }
+
+ public static String eval() {
+ return "UNKNOWN";
+ }
+ }
+
+ /** UDF instance for WINDOW_START. */
+ public static class BeamUdfWindowStart extends BeamBoundedWindowUdf
implements BeamSqlUdf {
+
+ public BeamUdfWindowStart(BoundedWindowFunc func) {
+ super(func);
+ }
+
+ public static Date eval() {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE.toDate();
+ }
+ }
+
+ /** UDF instance for WINDOW_END. */
+ public static class BeamUdfWindowEnd extends BeamBoundedWindowUdf implements
BeamSqlUdf {
+
+ public BeamUdfWindowEnd(BoundedWindowFunc func) {
+ super(func);
+ }
+
+ public static Date eval() {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE.toDate();
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamPaneInfoUdf.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamPaneInfoUdf.java
new file mode 100644
index 00000000000..b676734199d
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamPaneInfoUdf.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf;
+
+import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/** UDF to expose {@link PaneInfo}. */
+public class BeamPaneInfoUdf extends BeamSqlExpression {
+ /** supported pane functions. */
+ public enum PaneFunc {
+ FIRST_PANE,
+ LAST_PANE,
+ PANE_TIMING,
+ PANE_INDEX;
+ }
+
+ private PaneFunc func;
+
+ public BeamPaneInfoUdf(PaneFunc func) {
+ this.func = func;
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ public BeamSqlPrimitive evaluate(
+ Row inputRow, BoundedWindow window, PaneInfo paneInfo,
BeamSqlExpressionEnvironment env) {
+ switch (func) {
+ case FIRST_PANE:
+ return BeamSqlPrimitive.<Integer>of(SqlTypeName.INTEGER,
paneInfo.isFirst() ? 1 : 0);
+ case LAST_PANE:
+ return BeamSqlPrimitive.<Integer>of(SqlTypeName.INTEGER,
paneInfo.isLast() ? 1 : 0);
+ case PANE_INDEX:
+ return BeamSqlPrimitive.<Long>of(SqlTypeName.BIGINT,
paneInfo.getIndex());
+ case PANE_TIMING:
+ return BeamSqlPrimitive.<String>of(SqlTypeName.VARCHAR,
paneInfo.getTiming().name());
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(
+ Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
+ throw new IllegalStateException();
+ }
+
+ /** UDF instance for FIRST_PANE. */
+ public static class BeamUdfIsFirstPane extends BeamPaneInfoUdf implements
BeamSqlUdf {
+
+ public BeamUdfIsFirstPane(PaneFunc func) {
+ super(func);
+ }
+
+ public static Integer eval() {
+ return -1;
+ }
+ }
+
+ /** UDF instance for LAST_PANE. */
+ public static class BeamUdfIsLastPane extends BeamPaneInfoUdf implements
BeamSqlUdf {
+
+ public BeamUdfIsLastPane(PaneFunc func) {
+ super(func);
+ }
+
+ public static Integer eval() {
+ return -1;
+ }
+ }
+
+ /** UDF instance for PANE_INDEX. */
+ public static class BeamUdfPaneIndex extends BeamPaneInfoUdf implements
BeamSqlUdf {
+
+ public BeamUdfPaneIndex(PaneFunc func) {
+ super(func);
+ }
+
+ public static Long eval() {
+ return -1L;
+ }
+ }
+
+ /** UDF instance for PANE_TIMING. */
+ public static class BeamUdfPaneTiming extends BeamPaneInfoUdf implements
BeamSqlUdf {
+
+ public BeamUdfPaneTiming(PaneFunc func) {
+ super(func);
+ }
+
+ public static String eval() {
+ return PaneInfo.Timing.UNKNOWN.name();
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamUdfUdafRegister.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamUdfUdafRegister.java
new file mode 100644
index 00000000000..2f6028b28e6
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/BeamUdfUdafRegister.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf.BeamBoundedWindowUdf.BoundedWindowFunc;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf.BeamPaneInfoUdf.PaneFunc;
+import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
+
+/** Register UDF/UDAF provided by Apache Beam. */
+@AutoService(UdfUdafProvider.class)
+public class BeamUdfUdafRegister implements UdfUdafProvider {
+
+ @Override
+ public Map<String, Class<? extends BeamSqlUdf>> getBeamSqlUdfs() {
+ Map<String, Class<? extends BeamSqlUdf>> udfs =
+ new HashMap<String, Class<? extends BeamSqlUdf>>();
+ udfs.put(PaneFunc.FIRST_PANE.name(),
BeamPaneInfoUdf.BeamUdfIsFirstPane.class);
+ udfs.put(PaneFunc.LAST_PANE.name(),
BeamPaneInfoUdf.BeamUdfIsLastPane.class);
+ udfs.put(PaneFunc.PANE_INDEX.name(),
BeamPaneInfoUdf.BeamUdfPaneIndex.class);
+ udfs.put(PaneFunc.PANE_TIMING.name(),
BeamPaneInfoUdf.BeamUdfPaneTiming.class);
+
+ udfs.put(BoundedWindowFunc.WINDOW_TYPE.name(),
BeamBoundedWindowUdf.BeamUdfWindowType.class);
+ udfs.put(BoundedWindowFunc.WINDOW_START.name(),
BeamBoundedWindowUdf.BeamUdfWindowStart.class);
+ udfs.put(BoundedWindowFunc.WINDOW_END.name(),
BeamBoundedWindowUdf.BeamUdfWindowEnd.class);
+
+ return ImmutableMap.copyOf(udfs);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/package-info.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/package-info.java
new file mode 100644
index 00000000000..69092964387
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/udf/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * UDF to expose {@link org.apache.beam.sdk.transforms.windowing.PaneInfo} and
{@link
+ * org.apache.beam.sdk.transforms.windowing.BoundedWindow} details.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.udf;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index bbd9194accb..15c37cba6be 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -32,6 +32,8 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
@@ -111,11 +113,12 @@ public void setup() {
}
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(ProcessContext c, BoundedWindow window,
PaneInfo paneInfo) {
Row inputRow = c.element();
@Nullable
List<Object> rawResultValues =
- executor.execute(inputRow, null,
BeamSqlExpressionEnvironments.forRow(inputRow, null));
+ executor.execute(
+ inputRow, window, paneInfo,
BeamSqlExpressionEnvironments.forRow(inputRow, null));
if (rawResultValues != null) {
List<Object> castResultValues =
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 5b23151e795..419c4fdb1d5 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -34,6 +34,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.calcite.linq4j.function.Parameter;
+import org.joda.time.Instant;
import org.junit.Test;
/** Tests for UDF/UDAF. */
@@ -155,6 +156,37 @@ public void testAutoUdfUdaf() throws Exception {
pipeline.run().waitUntilFinish();
}
+ /** test window/pane UDFs. */
+ @Test
+ public void testWindowPaneUdf() throws Exception {
+ Schema resultType =
+ Schema.builder()
+ .addInt32Field("f_int2")
+ .addStringField("window_type")
+ .addDateTimeField("window_start")
+ .addDateTimeField("window_end")
+ .addInt32Field("first_pane")
+ .addInt32Field("last_pane")
+ .addStringField("pane_timing")
+ .addInt64Field("pane_index")
+ .build();
+
+ Row row =
+ Row.withSchema(resultType)
+ .addValues(0, "GLOBAL", Instant.now(), Instant.now(), 1, 1,
"ON_TIME", 0)
+ .build();
+
+ String sql =
+ "SELECT f_int2, FIRST_PANE(), LAST_PANE(), PANE_TIMING(), PANE_INDEX()"
+ + ", WINDOW_TYPE(), WINDOW_START(), WINDOW_END()"
+ + " FROM PCOLLECTION GROUP BY f_int2";
+ PCollection<Row> result =
+ boundedInput1.apply("testUdaf",
SqlTransform.query(sql).withAutoUdfUdafLoad(true));
+
+ PAssert.that(result).containsInAnyOrder(row);
+ pipeline.run().waitUntilFinish();
+ }
+
/** Auto provider for test. */
@AutoService(UdfUdafProvider.class)
public static class UdfUdafProviderTest implements UdfUdafProvider {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 160770)
Time Spent: 0.5h (was: 20m)
> expose PaneInfo and BoundedWindow as UDF
> ----------------------------------------
>
> Key: BEAM-5203
> URL: https://issues.apache.org/jira/browse/BEAM-5203
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Xu Mingmin
> Assignee: Xu Mingmin
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> besides adding new keywords in Calcite, there's an alternative way to expose
> PaneInfo and BoundedWindow of Row by UDF.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)