[FLINK-3226] Add DataSet scan and conversion to DataSet[Row] This closes #1579.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f60c84 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f60c84 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f60c84 Branch: refs/heads/tableOnCalcite Commit: 99f60c84092a5c656591d68f4d4450f14f88b9ba Parents: 3cb76fc Author: Fabian Hueske <[email protected]> Authored: Tue Feb 2 17:15:28 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Feb 12 11:34:09 2016 +0100 ---------------------------------------------------------------------- .../api/java/table/JavaBatchTranslator.scala | 2 + .../api/scala/table/ScalaBatchTranslator.scala | 8 +- .../flink/api/table/plan/PlanTranslator.scala | 87 ++++++++--- .../flink/api/table/plan/TypeConverter.scala | 15 ++ .../plan/nodes/dataset/DataSetSource.scala | 150 ++++++++++++++++++- .../plan/rules/dataset/DataSetScanRule.scala | 8 +- .../api/table/plan/schema/DataSetTable.scala | 41 ++--- .../flink/api/java/table/test/AsITCase.java | 63 ++++++-- .../flink/api/java/table/test/FilterITCase.java | 15 +- .../flink/api/java/table/test/SelectITCase.java | 15 +- .../flink/api/scala/table/test/AsITCase.scala | 40 ++++- .../api/scala/table/test/SelectITCase.scala | 28 ++-- 12 files changed, 380 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 7e91190..f70f477 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -40,11 +40,13 @@ class JavaBatchTranslator extends PlanTranslator { override def createTable[A]( repr: Representation[A], + fieldIndexes: Array[Int], fieldNames: Array[String]): Table = { // create table representation from DataSet val dataSetTable = new DataSetTable[A]( repr.asInstanceOf[JavaDataSet[A]], + fieldIndexes, fieldNames ) http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala index 1c453fa..cc92c37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala @@ -38,8 +38,12 @@ class ScalaBatchTranslator extends PlanTranslator { type Representation[A] = DataSet[A] - override def createTable[A](repr: Representation[A], fieldNames: Array[String]): Table = { - javaTranslator.createTable(repr.javaSet, fieldNames) + override def createTable[A]( + repr: Representation[A], + fieldIndexes: Array[Int], + fieldNames: Array[String]): Table = + { + javaTranslator.createTable(repr.javaSet, fieldIndexes, fieldNames) } override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataSet[O] = { http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala index 4e97f83..af22768 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala @@ -18,10 +18,12 @@ package org.apache.flink.api.table.plan import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.table.parser.ExpressionParser -import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.expressions.{Naming, Expression, UnresolvedFieldReference} import org.apache.flink.api.table.Table import scala.language.reflectiveCalls @@ -42,7 +44,10 @@ abstract class PlanTranslator { /** * Creates a [[Table]] from a DataSet (the underlying representation). */ - def createTable[A](repr: Representation[A], fieldNames: Array[String]): Table + def createTable[A]( + repr: Representation[A], + fieldIndexes: Array[Int], + fieldNames: Array[String]): Table /** * Creates a [[Table]] from the given DataSet. @@ -50,10 +55,15 @@ abstract class PlanTranslator { def createTable[A](repr: Representation[A]): Table = { val fieldNames: Array[String] = repr.getType() match { - case c: CompositeType[A] => c.getFieldNames - case tpe => Array() // createTable will throw an exception for this later + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case tpe => + throw new IllegalArgumentException( + s"Type $tpe requires explicit field naming with AS.") } - createTable(repr, fieldNames) + val fieldIndexes = fieldNames.indices.toArray + createTable(repr, fieldIndexes, fieldNames) } /** @@ -75,17 +85,60 @@ abstract class PlanTranslator { */ def createTable[A](repr: Representation[A], exprs: Array[Expression]): Table = { - val fieldNames: Array[String] = exprs - .map { - case ResolvedFieldReference(name, _) => - name - case UnresolvedFieldReference(name) => - name - case _ => - throw new IllegalArgumentException("Only field expressions allowed") - } - - createTable(repr, fieldNames) + val inputType = repr.getType() + + val indexedNames: Array[(Int, String)] = inputType match { + case a: AtomicType[A] => + if (exprs.length != 1) { + throw new IllegalArgumentException("Atomic type may can only have a single field.") + } + exprs.map { + case UnresolvedFieldReference(name) => (0, name) + case _ => throw new IllegalArgumentException( + "Field reference expression expected.") + } + case t: TupleTypeInfo[A] => + exprs.zipWithIndex.map { + case (UnresolvedFieldReference(name), idx) => (idx, name) + case (Naming(UnresolvedFieldReference(origName), name), _) => + val idx = t.getFieldIndex(origName) + if (idx < 0) { + throw new IllegalArgumentException(s"$origName is not a field of type $t") + } + (idx, name) + case _ => throw new IllegalArgumentException( + "Field reference expression or naming expression expected.") + } + case c: CaseClassTypeInfo[A] => + exprs.zipWithIndex.map { + case (UnresolvedFieldReference(name), idx) => (idx, name) + case (Naming(UnresolvedFieldReference(origName), name), _) => + val idx = c.getFieldIndex(origName) + if (idx < 0) { + throw new IllegalArgumentException(s"$origName is not a field of type $c") + } + (idx, name) + case _ => throw new IllegalArgumentException( + "Field reference expression or naming expression expected.") + } + case p: PojoTypeInfo[A] => + exprs.map { + case Naming(UnresolvedFieldReference(origName), name) => + val idx = p.getFieldIndex(origName) + if (idx < 0) { + throw new IllegalArgumentException(s"$origName is not a field of type $p") + } + (idx, name) + case _ => throw new IllegalArgumentException( + "Field naming expression expected.") + } + case tpe => throw new IllegalArgumentException( + s"Type $tpe cannot be converted into Table.") + } + + val (fieldIndexes, fieldNames) = indexedNames.unzip + + createTable(repr, fieldIndexes.toArray, fieldNames.toArray) } } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index 30a0589..f6fe2e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -22,19 +22,34 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, GenericTypeInfo} +import org.apache.flink.api.java.typeutils.ValueTypeInfo._ +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo object TypeConverter { def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { case BOOLEAN_TYPE_INFO => BOOLEAN + case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN case BYTE_TYPE_INFO => TINYINT + case BYTE_VALUE_TYPE_INFO => TINYINT case SHORT_TYPE_INFO => SMALLINT + case SHORT_VALUE_TYPE_INFO => SMALLINT case INT_TYPE_INFO => INTEGER + case INT_VALUE_TYPE_INFO => INTEGER case LONG_TYPE_INFO => BIGINT + case LONG_VALUE_TYPE_INFO => BIGINT case FLOAT_TYPE_INFO => FLOAT + case FLOAT_VALUE_TYPE_INFO => FLOAT case DOUBLE_TYPE_INFO => DOUBLE + case DOUBLE_VALUE_TYPE_INFO => DOUBLE case STRING_TYPE_INFO => VARCHAR + case STRING_VALUE_TYPE_INFO => VARCHAR case DATE_TYPE_INFO => DATE +// case t: TupleTypeInfo[_] => ROW +// case c: CaseClassTypeInfo[_] => ROW +// case p: PojoTypeInfo[_] => STRUCTURED +// case g: GenericTypeInfo[_] => OTHER case _ => ??? // TODO more types } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala index effaf1a..53067dc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala @@ -18,12 +18,25 @@ package org.apache.flink.api.table.plan.nodes.dataset +import java.lang.reflect.Field + import org.apache.calcite.plan._ -import org.apache.calcite.rel.{RelWriter, RelNode} +import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType} import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TypeConverter +import org.apache.flink.api.table.plan.schema.DataSetTable +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.configuration.Configuration + +import scala.collection.JavaConverters._ /** * Flink RelNode which matches along with DataSource. @@ -32,11 +45,12 @@ class DataSetSource( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType, - inputDataSet: DataSet[_]) + rowType: RelDataType) extends TableScan(cluster, traitSet, table) with DataSetRel { + val dataSetTable: DataSetTable[Any] = table.unwrap(classOf[DataSetTable[Any]]) + override def deriveRowType() = rowType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { @@ -44,12 +58,136 @@ class DataSetSource( cluster, traitSet, table, - rowType, - inputDataSet + rowType ) } override def translateToPlan: DataSet[Any] = { - ??? + + val inputDataSet: DataSet[Any] = dataSetTable.dataSet + + // extract Flink data types + val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala + .map(f => f.getType.getSqlTypeName) + .map(n => TypeConverter.sqlTypeToTypeInfo(n)) + .toArray + + val rowTypeInfo = new RowTypeInfo(fieldTypes, dataSetTable.fieldNames) + + // convert input data set into row data set + inputDataSet.getType match { + case t: TupleTypeInfo[_] => + val rowMapper = new TupleToRowMapper(dataSetTable.fieldIndexes) + inputDataSet.asInstanceOf[DataSet[Tuple]] + .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] + + case c: CaseClassTypeInfo[_] => + val rowMapper = new CaseClassToRowMapper(dataSetTable.fieldIndexes) + inputDataSet.asInstanceOf[DataSet[Product]] + .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] + + case p: PojoTypeInfo[_] => + // get pojo class + val typeClazz = p.getTypeClass.asInstanceOf[Class[Any]] + // get original field names + val origFieldNames = dataSetTable.fieldIndexes.map(i => p.getFieldNames()(i)) + + val rowMapper = new PojoToRowMapper(typeClazz, origFieldNames) + inputDataSet.asInstanceOf[DataSet[Any]] + .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] + + case a: AtomicType[_] => + val rowMapper = new AtomicToRowMapper + inputDataSet.asInstanceOf[DataSet[Any]] + .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] + } + } + +} + +class TupleToRowMapper(val fromIndexes: Array[Int]) + extends RichMapFunction[Tuple, Row] +{ + + @transient var outR: Row = null + + override def open(conf: Configuration): Unit = { + outR = new Row(fromIndexes.length) + } + + override def map(v: Tuple): Row = { + + var i = 0 + while (i < fromIndexes.length) { + outR.setField(i, v.getField(fromIndexes(i))) + i += 1 + } + outR + } +} + +class CaseClassToRowMapper(val fromIndexes: Array[Int]) + extends RichMapFunction[Product, Row] +{ + + @transient var outR: Row = null + + override def open(conf: Configuration): Unit = { + outR = new Row(fromIndexes.length) + } + + override def map(v: Product): Row = { + + var i = 0 + while (i < fromIndexes.length) { + outR.setField(i, v.productElement(fromIndexes(i))) + i += 1 + } + outR + } +} + +class PojoToRowMapper(val inClazz: Class[Any], val fieldNames: Array[String]) + extends RichMapFunction[Any, Row] +{ + + @transient var outR: Row = null + @transient var fields: Array[Field] = null + + override def open(conf: Configuration): Unit = { + + fields = fieldNames.map { n => + val f = inClazz.getField(n) + f.setAccessible(true) + f + } + outR = new Row(fieldNames.length) + } + + override def map(v: Any): Row = { + + var i = 0 + while (i < fields.length) { + outR.setField(i, fields(i).get(v)) + i += 1 + } + outR + } +} + +class AtomicToRowMapper() + extends RichMapFunction[Any, Row] +{ + + @transient var outR: Row = null + + override def open(conf: Configuration): Unit = { + outR = new Row(1) + } + + override def map(v: Any): Row = { + + outR.setField(0, v) + outR } } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala index 937f3e2..f995201 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala @@ -21,10 +21,8 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, FlinkConvention} -import org.apache.flink.api.table.plan.schema.DataSetTable class DataSetScanRule extends ConverterRule( @@ -33,17 +31,17 @@ class DataSetScanRule DataSetConvention.INSTANCE, "DataSetScanRule") { + def convert(rel: RelNode): RelNode = { val scan: FlinkScan = rel.asInstanceOf[FlinkScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val dataSet: DataSet[_] = scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet + new DataSetSource( rel.getCluster, traitSet, scan.getTable, - rel.getRowType, - dataSet + rel.getRowType ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala index e6aecab..75090a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala @@ -28,40 +28,45 @@ import org.apache.calcite.schema.Statistic import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.api.common.typeinfo.AtomicType import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.plan.TypeConverter class DataSetTable[T]( val dataSet: DataSet[T], + val fieldIndexes: Array[Int], val fieldNames: Array[String]) extends AbstractTable { + if (fieldIndexes.length != fieldNames.length) { + throw new IllegalArgumentException( + "Number of field indexes and field names must be equal.") + } + // check uniquenss of field names if (fieldNames.length != fieldNames.toSet.size) { - throw new scala.IllegalArgumentException( + throw new IllegalArgumentException( "Table field names must be unique.") } - val dataSetType: CompositeType[T] = + val fieldTypes: Array[SqlTypeName] = dataSet.getType match { case cType: CompositeType[T] => - cType - case _ => - throw new scala.IllegalArgumentException( - "DataSet must have a composite type.") - } - - val fieldTypes: Array[SqlTypeName] = - if (fieldNames.length == dataSetType.getArity) { - (0 until dataSetType.getArity) - .map(i => dataSetType.getTypeAt(i)) - .map(TypeConverter.typeInfoToSqlType) - .toArray - } - else { - throw new IllegalArgumentException( - "Arity of DataSet type not equal to number of field names.") + if (fieldNames.length != cType.getArity) { + throw new IllegalArgumentException( + s"Arity of DataSet type (" + cType.getFieldNames.deep + ") " + + "not equal to number of field names " + fieldNames.deep + ".") + } + fieldIndexes + .map(cType.getTypeAt(_)) + .map(TypeConverter.typeInfoToSqlType(_)) + case aType: AtomicType[T] => + if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { + throw new IllegalArgumentException( + "Non-composite input type may have only a single field and its index must be 0.") + } + Array(TypeConverter.typeInfoToSqlType(aType)) } override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java index f257c32..0f35d75 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -28,8 +28,8 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; +import java.util.ArrayList; import java.util.List; @RunWith(Parameterized.class) @@ -40,8 +40,8 @@ public class AsITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = NotImplementedError.class) - public void testAs() throws Exception { + @Test + public void testAsFromTuple() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -50,12 +50,39 @@ public class AsITCase extends MultipleProgramsTestBase { DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); List<Row> results = ds.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + } + + @Test + public void testAsFromPojo() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + List<SmallPojo> data = new ArrayList<>(); + data.add(new SmallPojo("Peter", 28, 4000.00, "Sales")); + data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering")); + data.add(new SmallPojo("Lucy", 42, 6000.00, "HR")); + + Table table = + tableEnv.fromDataSet(env.fromCollection(data), + "department AS a, " + + "age AS b, " + + "salary AS c, " + + "name AS d"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = + "Sales,28,4000.0,Peter\n" + + "Engineering,56,10000.0,Anna\n" + + "HR,42,6000.0,Lucy\n"; compareResultAsText(results, expected); } @@ -129,5 +156,23 @@ public class AsITCase extends MultipleProgramsTestBase { String expected = ""; compareResultAsText(results, expected); } + + public static class SmallPojo { + + public SmallPojo() { } + + public SmallPojo(String name, int age, double salary, String department) { + this.name = name; + this.age = age; + this.salary = salary; + this.department = department; + } + + public String name; + public int age; + public double salary; + public String department; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java index a3ab10f..cd08879 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -60,7 +60,7 @@ public class FilterITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testAllPassingFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -75,12 +75,13 @@ public class FilterITCase extends MultipleProgramsTestBase { DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); List<Row> results = ds.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + "21,6,Comment#15\n"; compareResultAsText(results, expected); } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java index a3d31da..a66219c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -41,7 +41,7 @@ public class SelectITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = NotImplementedError.class) + @Test public void testSimpleSelectAllWithAs() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -55,12 +55,13 @@ public class SelectITCase extends MultipleProgramsTestBase { DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + "21,6,Comment#15\n"; compareResultAsText(results, expected); } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala index 5ff2b82..6779d4c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala @@ -33,18 +33,38 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[NotImplementedError]) + @Test def testAs(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + @throws(classOf[Exception]) + def testAsFromCaseClass(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val data = List( + SomeCaseClass("Peter", 28, 4000.00, "Sales"), + SomeCaseClass("Anna", 56, 10000.00, "Engineering"), + SomeCaseClass("Lucy", 42, 6000.00, "HR")) + + val t = env.fromCollection(data).as('a, 'b, 'c, 'd) + + val expected: String = + "Peter,28,4000.0,Sales\n" + + "Anna,56,10000.0,Engineering\n" + + "Lucy,42,6000.0,HR\n" val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -105,4 +125,10 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + +} + +case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) { + + def this() { this("", 0, 0.0, "") } } http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala index 4dadfe4..3700d67 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala @@ -33,34 +33,34 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[NotImplementedError]) + @Test def testSimpleSelectAll(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3) - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testSimpleSelectAllWithAs(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c) - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) }
