Repository: flink
Updated Branches:
  refs/heads/release-1.6 4862101dd -> 7bb07e4e7


http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
index 7a98b0b..3f6426d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
+import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -84,18 +85,44 @@ abstract class DescriptorTestBase {
   }
 }
 
-class TestTableSourceDescriptor(connector: ConnectorDescriptor)
-  extends TableSourceDescriptor {
+class TestTableDescriptor(connector: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor[TestTableDescriptor]
+  with StreamableDescriptor[TestTableDescriptor] {
 
-  this.connectorDescriptor = Some(connector)
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+  private var updateMode: Option[String] = None
 
-  def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = {
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    connector.addProperties(properties)
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+  }
+
+  override def withFormat(format: FormatDescriptor): TestTableDescriptor = {
     this.formatDescriptor = Some(format)
     this
   }
 
-  def addSchema(schema: Schema): TestTableSourceDescriptor = {
+  override def withSchema(schema: Schema): TestTableDescriptor = {
     this.schemaDescriptor = Some(schema)
     this
   }
+
+  override def inAppendMode(): TestTableDescriptor = {
+    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
+    this
+  }
+
+  override def inRetractMode(): TestTableDescriptor = {
+    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
+    this
+  }
+
+  override def inUpsertMode(): TestTableDescriptor = {
+    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
+    this
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
new file mode 100644
index 0000000..ccac317
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+  * Tests for [[TableDescriptor]].
+  */
+class TableDescriptorTest extends TableTestBase {
+
+  @Test
+  def testStreamTableSourceDescriptor(): Unit = {
+    testTableSourceDescriptor(true)
+  }
+
+  @Test
+  def testBatchTableSourceDescriptor(): Unit = {
+    testTableSourceDescriptor(false)
+  }
+
+  private def testTableSourceDescriptor(isStreaming: Boolean): Unit = {
+
+    val schema = Schema()
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+    // CSV table source and sink do not support proctime yet
+    //if (isStreaming) {
+    //  schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
+    //}
+
+    val connector = FileSystem()
+      .path("/path/to/csv")
+
+    val format = Csv()
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+      .fieldDelimiter("#")
+
+    val descriptor: RegistrableDescriptor = if (isStreaming) {
+      streamTestUtil().tableEnv
+        .connect(connector)
+        .withFormat(format)
+        .withSchema(schema)
+        .inAppendMode()
+    } else {
+      batchTestUtil().tableEnv
+        .connect(connector)
+        .withFormat(format)
+        .withSchema(schema)
+    }
+
+    // tests the table factory discovery and thus validates the result 
automatically
+    descriptor.registerTableSourceAndSink("MyTable")
+
+    val expectedCommonProperties = Seq(
+      "connector.property-version" -> "1",
+      "connector.type" -> "filesystem",
+      "connector.path" -> "/path/to/csv",
+      "format.property-version" -> "1",
+      "format.type" -> "csv",
+      "format.fields.0.name" -> "myfield",
+      "format.fields.0.type" -> "VARCHAR",
+      "format.fields.1.name" -> "myfield2",
+      "format.fields.1.type" -> "INT",
+      "format.field-delimiter" -> "#",
+      "schema.0.name" -> "myfield",
+      "schema.0.type" -> "VARCHAR",
+      "schema.1.name" -> "myfield2",
+      "schema.1.type" -> "INT"
+    )
+
+    val expectedProperties = if (isStreaming) {
+      expectedCommonProperties ++ Seq(
+        //"schema.2.name" -> "proctime",
+        //"schema.2.type" -> "TIMESTAMP",
+        //"schema.2.proctime" -> "true",
+        "update-mode" -> "append"
+      )
+    } else {
+      expectedCommonProperties
+    }
+
+    val actualProperties = new DescriptorProperties(true)
+    descriptor.addProperties(actualProperties)
+
+    assertEquals(expectedProperties.toMap.asJava, actualProperties.asMap)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
deleted file mode 100644
index a7dd644..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-
-/**
-  * Tests for [[TableSourceDescriptor]].
-  */
-class TableSourceDescriptorTest extends TableTestBase {
-
-  @Test
-  def testStreamTableSourceDescriptor(): Unit = {
-    testTableSourceDescriptor(true)
-  }
-
-  @Test
-  def testBatchTableSourceDescriptor(): Unit = {
-    testTableSourceDescriptor(false)
-  }
-
-  private def testTableSourceDescriptor(isStreaming: Boolean): Unit = {
-
-    val schema = Schema()
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-    if (isStreaming) {
-      schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
-    }
-
-    val connector = FileSystem()
-      .path("/path/to/csv")
-
-    val format = Csv()
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-      .quoteCharacter(';')
-      .fieldDelimiter("#")
-      .lineDelimiter("\r\n")
-      .commentPrefix("%%")
-      .ignoreFirstLine()
-      .ignoreParseErrors()
-
-    val descriptor = if (isStreaming) {
-      streamTestUtil().tableEnv
-        .from(connector)
-        .withFormat(format)
-        .withSchema(schema)
-    } else {
-      batchTestUtil().tableEnv
-        .from(connector)
-        .withFormat(format)
-        .withSchema(schema)
-    }
-
-    val expectedCommonProperties = Seq(
-      "connector.property-version" -> "1",
-      "connector.type" -> "filesystem",
-      "connector.path" -> "/path/to/csv",
-      "format.property-version" -> "1",
-      "format.type" -> "csv",
-      "format.fields.0.name" -> "myfield",
-      "format.fields.0.type" -> "VARCHAR",
-      "format.fields.1.name" -> "myfield2",
-      "format.fields.1.type" -> "INT",
-      "format.quote-character" -> ";",
-      "format.field-delimiter" -> "#",
-      "format.line-delimiter" -> "\r\n",
-      "format.comment-prefix" -> "%%",
-      "format.ignore-first-line" -> "true",
-      "format.ignore-parse-errors" -> "true",
-      "schema.0.name" -> "myfield",
-      "schema.0.type" -> "VARCHAR",
-      "schema.1.name" -> "myfield2",
-      "schema.1.type" -> "INT"
-    )
-
-    val expectedProperties = if (isStreaming) {
-      expectedCommonProperties ++ Seq(
-        "schema.2.name" -> "proctime",
-        "schema.2.type" -> "TIMESTAMP",
-        "schema.2.proctime" -> "true"
-      )
-    } else {
-      expectedCommonProperties
-    }
-
-    val actualProperties = new DescriptorProperties(true)
-    descriptor.addProperties(actualProperties)
-
-    assertEquals(expectedProperties.toMap.asJava, actualProperties.asMap)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index ac9894c..e62396f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -62,13 +62,13 @@ object CommonTestData {
     )
   }
 
-  def getInMemoryTestCatalog: ExternalCatalog = {
+  def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = {
     val csvRecord1 = Seq(
       "1#1#Hi",
       "2#2#Hello",
       "3#2#Hello world"
     )
-    val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", 
"tmp")
+    val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), 
"csv-test1", "tmp")
 
     val connDesc1 = FileSystem().path(tempFilePath1)
     val formatDesc1 = Csv()
@@ -76,13 +76,17 @@ object CommonTestData {
       .field("b", Types.LONG)
       .field("c", Types.STRING)
       .fieldDelimiter("#")
-      .lineDelimiter("$")
     val schemaDesc1 = Schema()
       .field("a", Types.INT)
       .field("b", Types.LONG)
       .field("c", Types.STRING)
-    val externalCatalogTable1 = new ExternalCatalogTable(
-      connDesc1, Some(formatDesc1), Some(schemaDesc1), None, None)
+    val externalTableBuilder1 = ExternalCatalogTable.builder(connDesc1)
+      .withFormat(formatDesc1)
+      .withSchema(schemaDesc1)
+
+    if (isStreaming) {
+      externalTableBuilder1.inAppendMode()
+    }
 
     val csvRecord2 = Seq(
       "1#1#0#Hallo#1",
@@ -101,7 +105,7 @@ object CommonTestData {
       "5#14#13#JKL#2",
       "5#15#14#KLM#2"
     )
-    val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", 
"tmp")
+    val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), 
"csv-test2", "tmp")
 
     val connDesc2 = FileSystem().path(tempFilePath2)
     val formatDesc2 = Csv()
@@ -111,15 +115,19 @@ object CommonTestData {
       .field("g", Types.STRING)
       .field("h", Types.LONG)
       .fieldDelimiter("#")
-      .lineDelimiter("$")
     val schemaDesc2 = Schema()
       .field("d", Types.INT)
       .field("e", Types.LONG)
       .field("f", Types.INT)
       .field("g", Types.STRING)
       .field("h", Types.LONG)
-    val externalCatalogTable2 = new ExternalCatalogTable(
-      connDesc2, Some(formatDesc2), Some(schemaDesc2), None, None)
+    val externalTableBuilder2 = ExternalCatalogTable.builder(connDesc2)
+      .withFormat(formatDesc2)
+      .withSchema(schemaDesc2)
+
+    if (isStreaming) {
+      externalTableBuilder2.inAppendMode()
+    }
 
     val catalog = new InMemoryExternalCatalog("test")
     val db1 = new InMemoryExternalCatalog("db1")
@@ -128,9 +136,9 @@ object CommonTestData {
     catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
 
     // Register the table with both catalogs
-    catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
-    db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
-    db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false)
+    catalog.createTable("tb1", externalTableBuilder1.asTableSource(), 
ignoreIfExists = false)
+    db1.createTable("tb1", externalTableBuilder1.asTableSource(), 
ignoreIfExists = false)
+    db2.createTable("tb2", externalTableBuilder2.asTableSource(), 
ignoreIfExists = false)
     catalog
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c25f30f..87dbc91 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.utils
 import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, 
TableEnvironment}
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
TableSourceDescriptor}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
TableDescriptor}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 
@@ -55,5 +55,5 @@ class MockTableEnvironment extends TableEnvironment(new 
TableConfig) {
 
   override def explain(table: Table): String = ???
 
-  override def from(connectorDescriptor: ConnectorDescriptor): 
TableSourceDescriptor = ???
+  override def connect(connectorDescriptor: ConnectorDescriptor): 
TableDescriptor = ???
 }

Reply via email to