[FLINK-3226] implement getUniqueName method in TranslationContext This closes #1600 and #1567
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fff25df5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fff25df5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fff25df5 Branch: refs/heads/tableOnCalcite Commit: fff25df5ee720f2aec2be5a6309e31968ecdac09 Parents: 509c4b9 Author: vasia <[email protected]> Authored: Thu Feb 11 14:24:24 2016 +0100 Committer: vasia <[email protected]> Committed: Thu Feb 11 16:48:24 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/plan/RexNodeTranslator.scala | 2 +- .../flink/api/table/plan/TranslationContext.scala | 4 ++++ .../plan/nodes/dataset/DataSetGroupReduce.scala | 2 +- .../api/java/table/test/AggregationsITCase.java | 1 - .../api/scala/table/test/AggregationsITCase.scala | 15 ++++++++++++++- 5 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index 07e3924..bad111f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -35,7 +35,7 @@ object RexNodeTranslator { exp match { case agg: Aggregation => - val name = "TMP_" + agg.hashCode().toHexString.toUpperCase + val name = TranslationContext.getUniqueName val aggCall = toAggCall(agg, name, relBuilder) val fieldExp = new UnresolvedFieldReference(name) (fieldExp, List(aggCall)) http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index b2b0c2b..51af8d6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -70,6 +70,10 @@ object TranslationContext { } + def getUniqueName: String = { + "TMP_" + nameCntr.getAndIncrement() + } + def getRelBuilder: RelBuilder = { relBuilder } http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index ad7e0e9..afe09bb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -67,7 +67,7 @@ class DataSetGroupReduce( config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config) + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType) // get the output types val fieldsNames = rowType.getFieldNames http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index 8e81893..bcb2308 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -62,7 +62,6 @@ public class AggregationsITCase extends MultipleProgramsTestBase { super(mode); } - @Ignore //DataSetMap needs to be implemented @Test public void testAggregationTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/fff25df5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala index 64f6757..68cb1ed 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala @@ -32,7 +32,6 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Ignore //DataSetMap needs to be implemented @Test def testAggregationTypes(): Unit = { @@ -71,6 +70,20 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testProjection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val t = env.fromElements( + (1: Byte, 1: Short), + (2: Byte, 2: Short)).toTable + .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum) + + val expected = "1,3,2,1,3" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Ignore // it seems like the arithmetic expression is added to the field position @Test(expected = classOf[NotImplementedError]) def testAggregationWithArithmetic(): Unit = {
