This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b53eef53271e1f73cfe3d7e0b4fe93fb2b74b5f
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Thu Jul 18 20:59:40 2019 +0800

    [FLINK-13286][table-common] Port FileSystem and FileSystemValidator to 
common
---
 .../apache/flink/table/descriptors/FileSystem.java | 58 ++++++++++++++++++++
 .../table/descriptors/FileSystemValidator.java}    | 31 +++++------
 .../flink/table/descriptors/FileSystem.scala       | 64 ----------------------
 .../flink/table/descriptors/FileSystemTest.scala   |  2 +-
 .../table/descriptors/TableDescriptorTest.scala    |  2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  6 +-
 6 files changed, 77 insertions(+), 86 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java
new file mode 100644
index 0000000..291c0f9
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
+import static 
org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;
+
+/**
+ * Connector descriptor for a file system.
+ */
+@PublicEvolving
+public class FileSystem extends ConnectorDescriptor {
+
+       private String path = null;
+
+       public FileSystem() {
+               super(CONNECTOR_TYPE_VALUE, 1, true);
+       }
+
+       /**
+        * Sets the path to a file or directory in a file system.
+        *
+        * @param path the path a file or directory
+        */
+       public FileSystem path(String path) {
+               this.path = path;
+               return this;
+       }
+
+       @Override
+       protected Map<String, String> toConnectorProperties() {
+               DescriptorProperties properties = new DescriptorProperties();
+               if (path != null) {
+                       properties.putString(CONNECTOR_PATH, path);
+               }
+               return properties.asMap();
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java
similarity index 54%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java
index 4d1b7de..d0e2e1b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java
@@ -16,26 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
 
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, 
CONNECTOR_TYPE_VALUE}
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
-  * Validator for [[FileSystem]].
-  */
-class FileSystemValidator extends ConnectorDescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    super.validate(properties)
-    properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false)
-    properties.validateString(CONNECTOR_PATH, false, 1)
-  }
-}
-
-object FileSystemValidator {
+ * Validator for {@link FileSystem}.
+ */
+@PublicEvolving
+public class FileSystemValidator extends ConnectorDescriptorValidator {
 
-  val CONNECTOR_TYPE_VALUE = "filesystem"
-  val CONNECTOR_PATH = "connector.path"
+       public static final String CONNECTOR_TYPE_VALUE = "filesystem";
+       public static final String CONNECTOR_PATH = "connector.path";
 
+       @Override
+       public void validate(DescriptorProperties properties) {
+               super.validate(properties);
+               properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, 
false);
+               properties.validateString(CONNECTOR_PATH, false, 1);
+       }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
deleted file mode 100644
index 77cf27b..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, 
CONNECTOR_TYPE_VALUE}
-
-/**
-  * Connector descriptor for a file system.
-  */
-class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, 1, true) {
-
-  private var path: Option[String] = None
-
-  /**
-    * Sets the path to a file or directory in a file system.
-    *
-    * @param path the path a file or directory
-    */
-  def path(path: String): FileSystem = {
-    this.path = Some(path)
-    this
-  }
-
-  override protected def toConnectorProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-
-    path.foreach(properties.putString(CONNECTOR_PATH, _))
-
-    properties.asMap()
-  }
-}
-
-/**
-  * Connector descriptor for a file system.
-  */
-object FileSystem {
-
-  /**
-    * Connector descriptor for a file system.
-    *
-    * @deprecated Use `new FileSystem()`.
-    */
-  @deprecated
-  def apply(): FileSystem = new FileSystem()
-  
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
index 1162694..d232a0d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
@@ -40,7 +40,7 @@ class FileSystemTest extends DescriptorTestBase {
   // 
----------------------------------------------------------------------------------------------
 
   override def descriptors(): util.List[Descriptor] = {
-    util.Arrays.asList(FileSystem().path("/myfile"))
+    util.Arrays.asList(new FileSystem().path("/myfile"))
   }
 
   override def validator(): DescriptorValidator = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index df4d3fc..555a030 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -56,7 +56,7 @@ class TableDescriptorTest extends TableTestBase {
     //  schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
     //}
 
-    val connector = FileSystem()
+    val connector = new FileSystem()
       .path("/path/to/csv")
 
     val format = OldCsv()
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 0d633fe..b5ada5d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -67,7 +67,7 @@ object CommonTestData {
     )
     val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), 
"csv-test1", "tmp")
 
-    val connDesc1 = FileSystem().path(tempFilePath1)
+    val connDesc1 = new FileSystem().path(tempFilePath1)
     val formatDesc1 = OldCsv()
       .field("a", Types.INT)
       .field("b", Types.LONG)
@@ -106,7 +106,7 @@ object CommonTestData {
     )
     val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), 
"csv-test2", "tmp")
 
-    val connDesc2 = FileSystem().path(tempFilePath2)
+    val connDesc2 = new FileSystem().path(tempFilePath2)
     val formatDesc2 = OldCsv()
       .field("d", Types.INT)
       .field("e", Types.LONG)
@@ -131,7 +131,7 @@ object CommonTestData {
     }
 
     val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
-    val connDesc3 = FileSystem().path(tempFilePath3)
+    val connDesc3 = new FileSystem().path(tempFilePath3)
     val formatDesc3 = OldCsv()
       .field("x", Types.INT)
       .field("y", Types.LONG)

Reply via email to