[
https://issues.apache.org/jira/browse/BEAM-5103?focusedWorklogId=133796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133796
]
ASF GitHub Bot logged work on BEAM-5103:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Aug/18 20:05
Start Date: 10/Aug/18 20:05
Worklog Time Spent: 10m
Work Description: akedin closed pull request #6175: [BEAM-5103][SQL]test
aggregation functions at DSL level
URL: https://github.com/apache/beam/pull/6175
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
index 201059e92b4..7b035984b5d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
@@ -568,6 +568,133 @@ public void testComparisonOperatorFunction() {
checker.buildRunAndCheck();
}
+ @Test
+ @SqlOperatorTest(name = "MAX", kind = "MAX")
+ public void testMax() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("MAX(c_tinyint)", (byte) 3)
+ .addExpr("MAX(c_smallint)", (short) 3)
+ .addExpr("MAX(c_integer)", 3)
+ .addExpr("MAX(c_bigint)", 3L)
+ .addExpr("MAX(c_float)", 3.0f)
+ .addExpr("MAX(c_double)", 3.0)
+ .addExpr("MAX(c_decimal)", BigDecimal.valueOf(3.0))
+ .addExpr("MAX(ts)", parseDate("1986-04-15 11:35:26"));
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "MIN", kind = "MIN")
+ public void testMin() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("MIN(c_tinyint)", (byte) 1)
+ .addExpr("MIN(c_smallint)", (short) 1)
+ .addExpr("MIN(c_integer)", 1)
+ .addExpr("MIN(c_bigint)", 1L)
+ .addExpr("MIN(c_float)", 1.0f)
+ .addExpr("MIN(c_double)", 1.0)
+ .addExpr("MIN(c_decimal)", BigDecimal.valueOf(1.0))
+ .addExpr("MIN(ts)", parseDate("1986-02-15 11:35:26"));
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "SUM", kind = "SUM")
+ public void testSum() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("SUM(c_tinyint)", (byte) 6)
+ .addExpr("SUM(c_smallint)", (short) 6)
+ .addExpr("SUM(c_integer)", 6)
+ .addExpr("SUM(c_bigint)", 6L)
+ .addExpr("SUM(c_float)", 6.0f)
+ .addExpr("SUM(c_double)", 6.0)
+ .addExpr("SUM(c_decimal)", BigDecimal.valueOf(6.0));
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "AVG", kind = "AVG")
+ public void testAvg() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("AVG(c_tinyint)", (byte) 2)
+ .addExpr("AVG(c_smallint)", (short) 2)
+ .addExpr("AVG(c_integer)", 2)
+ .addExpr("AVG(c_bigint)", 2L)
+ .addExpr("AVG(c_float)", 2.0f)
+ .addExpr("AVG(c_double)", 2.0)
+ .addExpr("AVG(c_decimal)", BigDecimal.valueOf(2.0));
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Ignore("https://issues.apache.org/jira/browse/BEAM-5111")
+ @Test
+ @SqlOperatorTest(name = "$SUM0", kind = "SUM0")
+ public void testSUM0() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("$SUM0(c_tinyint)", (byte) 6)
+ .addExpr("$SUM0(c_smallint)", (short) 6)
+ .addExpr("$SUM0(c_integer)", 6)
+ .addExpr("$SUM0(c_bigint)", 6L)
+ .addExpr("$SUM0(c_float)", 6.0f)
+ .addExpr("$SUM0(c_double)", 6.0)
+ .addExpr("$SUM0(c_decimal)", BigDecimal.valueOf(6.0));
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "COUNT", kind = "COUNT")
+ public void testCount() {
+ ExpressionChecker checker =
+ new ExpressionChecker().addExpr("COUNT(*)", 4L).addExpr("COUNT(1)",
4L);
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "VAR_POP", kind = "VAR_POP")
+ public void testVARPOP() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("VAR_POP(c_integer)", 0)
+ .addExpr("VAR_POP(c_double)", 0.6666666);
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "VAR_SAMP", kind = "VAR_SAMP")
+ public void testVARSAMP() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("VAR_SAMP(c_integer)", 1)
+ .addExpr("VAR_SAMP(c_double)", 1.0);
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "COVAR_POP", kind = "COVAR_POP")
+ public void testCOVARPOP() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("COVAR_POP(c_integer, c_integer_two)", 0)
+ .addExpr("COVAR_POP(c_double, c_double_two)", 0.6666666);
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
+ @Test
+ @SqlOperatorTest(name = "COVAR_SAMP", kind = "COVAR_SAMP")
+ public void testAggrationFunctions() {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("COVAR_SAMP(c_integer, c_integer_two)", 1)
+ .addExpr("COVAR_SAMP(c_double, c_double_two)", 1.0);
+
+ checker.buildRunAndCheck(getAggregationTestPCollection());
+ }
+
@Test
@SqlOperatorTest(name = "CHARACTER_LENGTH", kind = "OTHER_FUNCTION")
@SqlOperatorTest(name = "CHAR_LENGTH", kind = "OTHER_FUNCTION")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 4380b0b7241..b165d392a9d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.extensions.sql.integrationtest;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.sdk.extensions.sql.utils.RowAsserts.matchesScalar;
import static org.junit.Assert.assertTrue;
import com.google.auto.value.AutoValue;
@@ -57,6 +58,8 @@
/** Base class for all built-in functions integration tests. */
public class BeamSqlBuiltinFunctionsIntegrationTestBase {
+ private static final double PRECISION_DOUBLE = 1e-7;
+ private static final float PRECISION_FLOAT = 1e-7f;
private static final Map<Class, TypeName> JAVA_CLASS_TO_TYPENAME =
ImmutableMap.<Class, TypeName>builder()
@@ -88,6 +91,20 @@
.addInt64Field("c_bigint_max")
.build();
+ private static final Schema ROW_TYPE_TWO =
+ Schema.builder()
+ .addNullableField("ts", FieldType.DATETIME)
+ .addNullableField("c_tinyint", FieldType.BYTE)
+ .addNullableField("c_smallint", FieldType.INT16)
+ .addNullableField("c_integer", FieldType.INT32)
+ .addNullableField("c_integer_two", FieldType.INT32)
+ .addNullableField("c_bigint", FieldType.INT64)
+ .addNullableField("c_float", FieldType.FLOAT)
+ .addNullableField("c_double", FieldType.DOUBLE)
+ .addNullableField("c_double_two", FieldType.DOUBLE)
+ .addNullableField("c_decimal", FieldType.DECIMAL)
+ .build();
+
@Rule public final TestPipeline pipeline = TestPipeline.create();
protected PCollection<Row> getTestPCollection() {
@@ -113,6 +130,50 @@
}
}
+ protected PCollection<Row> getAggregationTestPCollection() {
+ try {
+ return MockedBoundedTable.of(ROW_TYPE_TWO)
+ .addRows(
+ parseDate("1986-02-15 11:35:26"),
+ (byte) 1,
+ (short) 1,
+ 1,
+ 5,
+ 1L,
+ 1.0f,
+ 1.0,
+ 7.0,
+ BigDecimal.valueOf(1.0))
+ .addRows(
+ parseDate("1986-03-15 11:35:26"),
+ (byte) 2,
+ (short) 2,
+ 2,
+ 6,
+ 2L,
+ 2.0f,
+ 2.0,
+ 8.0,
+ BigDecimal.valueOf(2.0))
+ .addRows(
+ parseDate("1986-04-15 11:35:26"),
+ (byte) 3,
+ (short) 3,
+ 3,
+ 7,
+ 3L,
+ 3.0f,
+ 3.0,
+ 9.0,
+ BigDecimal.valueOf(3.0))
+ .addRows(null, null, null, null, null, null, null, null, null, null)
+ .buildIOReader(pipeline.begin())
+ .setRowSchema(ROW_TYPE_TWO);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
protected static DateTime parseDate(String str) {
return DateTimeFormat.forPattern("yyyy-MM-dd
HH:mm:ss").withZoneUTC().parseDateTime(str);
}
@@ -152,12 +213,14 @@ private static ExpressionTestCase of(
private transient List<ExpressionTestCase> exps = new ArrayList<>();
public ExpressionChecker addExpr(String expression, Object expectedValue) {
- // Because of erasure, we can only automatically infer non-parameterized
types
TypeName resultTypeName =
JAVA_CLASS_TO_TYPENAME.get(expectedValue.getClass());
checkArgument(
resultTypeName != null,
- "Could not infer a Beam type for %s."
- + " Parameterized types must be provided explicitly.");
+ String.format(
+ "The type of the expected value '%s' is unknown in
'addExpr(String expression, "
+ + "Object expectedValue)'. Please use 'addExpr(String expr,
Object expected, "
+ + "FieldType type)' instead and provide the type of the
expected object",
+ expectedValue));
addExpr(expression, expectedValue, FieldType.of(resultTypeName));
return this;
}
@@ -168,9 +231,12 @@ public ExpressionChecker addExpr(
return this;
}
- /** Build the corresponding SQL, compile to Beam Pipeline, run it, and
check the result. */
public void buildRunAndCheck() {
- PCollection<Row> inputCollection = getTestPCollection();
+ buildRunAndCheck(getTestPCollection());
+ }
+
+ /** Build the corresponding SQL, compile to Beam Pipeline, run it, and
check the result. */
+ public void buildRunAndCheck(PCollection<Row> inputCollection) {
for (ExpressionTestCase testCase : exps) {
String expression = testCase.sqlExpr();
@@ -181,8 +247,17 @@ public void buildRunAndCheck() {
PCollection<Row> output =
inputCollection.apply(testCase.toString(),
SqlTransform.query(sql));
- PAssert.that(output)
-
.containsInAnyOrder(TestUtils.RowsBuilder.of(schema).addRows(expectedValue).getRows());
+ // For floating point number(Double and Float), it's allowed to have
some precision delta,
+ // other types can use regular equality check.
+ if (expectedValue instanceof Double) {
+ PAssert.that(output).satisfies(matchesScalar((double) expectedValue,
PRECISION_DOUBLE));
+ } else if (expectedValue instanceof Float) {
+ PAssert.that(output).satisfies(matchesScalar((float) expectedValue,
PRECISION_FLOAT));
+ } else {
+ PAssert.that(output)
+ .containsInAnyOrder(
+
TestUtils.RowsBuilder.of(schema).addRows(expectedValue).getRows());
+ }
}
inputCollection.getPipeline().run();
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/RowAsserts.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/RowAsserts.java
index 4158c77d2a6..5e6d878c6a8 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/RowAsserts.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/RowAsserts.java
@@ -59,4 +59,15 @@
return null;
};
}
+
+ public static SerializableFunction<Iterable<Row>, Void> matchesScalar(
+ float expected, float delta) {
+
+ return input -> {
+ Row row = Iterables.getOnlyElement(input);
+ assertNotNull(row);
+ assertEquals(expected, row.getFloat(0), delta);
+ return null;
+ };
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 133796)
Time Spent: 5h 40m (was: 5.5h)
> Test aggregation functions at DSL levels
> ----------------------------------------
>
> Key: BEAM-5103
> URL: https://issues.apache.org/jira/browse/BEAM-5103
> Project: Beam
> Issue Type: Sub-task
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
> Time Spent: 5h 40m
> Remaining Estimate: 0h
>
> Typical aggregation functions include COUNT, SUM, MAX, MIN, etc.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)