This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new b1b0891  [FLINK-13313][table] create CatalogTableBuilder to support 
building CatalogTable from descriptors
b1b0891 is described below

commit b1b08916a7f0a93fb36291cce34580be419c7868
Author: bowen.li <[email protected]>
AuthorDate: Thu Jul 18 20:52:52 2019 -0700

    [FLINK-13313][table] create CatalogTableBuilder to support building 
CatalogTable from descriptors
    
    This PR adds CatalogTableBuilder as a replacement of 
ExternalCatalogTableBuilder to help users convert table source/sink descriptors 
to CatalogTable. The gap was mainly discovered when I was writing tests for 
HiveCatalog to make sure it works as expected to persist Flink generic tables
    
    This closes #9172.
---
 flink-connectors/flink-connector-hive/pom.xml      |   8 +
 .../table/catalog/hive/HiveCatalogITCase.java      | 170 ++++++++++++++++++++
 .../src/test/resources/csv/test.csv                |   3 +
 .../flink/table/catalog/CatalogTableBuilder.java   | 173 +++++++++++++++++++++
 .../flink/table/catalog/config/CatalogConfig.java  |   3 +
 ...criptor.java => ConnectorFormatDescriptor.java} |   9 +-
 .../table/descriptors/SchematicDescriptor.java     |   6 +-
 .../catalog/ExternalCatalogTableBuilder.scala      |   3 +
 8 files changed, 365 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index de2fa39..b3d7ea2 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -455,6 +455,14 @@ under the License.
                        </exclusions>
         </dependency>
 
+               <!-- TODO: move to flink-connector-hive-test end-to-end test 
module once it's setup -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-csv</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
new file mode 100644
index 0000000..6f437df
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableBuilder;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.FileSystem;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.OldCsv;
+import org.apache.flink.types.Row;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * IT case for HiveCatalog.
+ * TODO: move to flink-connector-hive-test end-to-end test module once it's 
setup
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class HiveCatalogITCase {
+
+       @HiveSQL(files = {})
+       private static HiveShell hiveShell;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static HiveCatalog hiveCatalog;
+       private static HiveConf hiveConf;
+
+       private String sourceTableName = "csv_source";
+       private String sinkTableName = "csv_sink";
+
+       @BeforeClass
+       public static void createCatalog() {
+               hiveConf = hiveShell.getHiveConf();
+               hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+               hiveCatalog.open();
+       }
+
+       @AfterClass
+       public static void closeCatalog() {
+               if (hiveCatalog != null) {
+                       hiveCatalog.close();
+               }
+       }
+
+       @Test
+       public void testGenericTable() throws Exception {
+               ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
+               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(execEnv);
+
+               tableEnv.registerCatalog("myhive", hiveCatalog);
+
+               TableSchema schema = TableSchema.builder()
+                       .field("name", DataTypes.STRING())
+                       .field("age", DataTypes.INT())
+                       .build();
+
+               FormatDescriptor format = new OldCsv()
+                       .field("name", Types.STRING())
+                       .field("age", Types.INT());
+
+               CatalogTable source =
+                       new CatalogTableBuilder(
+                               new 
FileSystem().path(this.getClass().getResource("/csv/test.csv").getPath()),
+                               schema)
+                       .withFormat(format)
+                       .inAppendMode()
+                       .withComment(null)
+                       .build();
+
+               Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), 
"test.csv");
+
+               CatalogTable sink =
+                       new CatalogTableBuilder(
+                               new 
FileSystem().path(p.toAbsolutePath().toString()),
+                               schema)
+                               .withFormat(format)
+                               .inAppendMode()
+                               .withComment(null)
+                               .build();
+
+               hiveCatalog.createTable(
+                       new ObjectPath(HiveCatalog.DEFAULT_DB, sourceTableName),
+                       source,
+                       false
+               );
+
+               hiveCatalog.createTable(
+                       new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
+                       sink,
+                       false
+               );
+
+               Table t = tableEnv.sqlQuery(
+                       String.format("select * from myhive.`default`.%s", 
sourceTableName));
+
+               List<Row> result = tableEnv.toDataSet(t, Row.class).collect();
+
+               // assert query result
+               assertEquals(
+                       Arrays.asList(
+                               Row.of("1", 1),
+                               Row.of("2", 2),
+                               Row.of("3", 3)),
+                       result
+               );
+
+               tableEnv.sqlUpdate(
+                       String.format("insert into myhive.`default`.%s select * 
from myhive.`default`.%s",
+                               sinkTableName,
+                               sourceTableName));
+               tableEnv.execute("myjob");
+
+               // assert written result
+               File resultFile = new File(p.toAbsolutePath().toString());
+               BufferedReader reader = new BufferedReader(new 
FileReader(resultFile));
+               String readLine;
+               for (int i = 0; i < 3; i++) {
+                       readLine = reader.readLine();
+                       assertEquals(String.format("%d,%d", i + 1, i + 1), 
readLine);
+               }
+
+               // No more line
+               assertNull(reader.readLine());
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv 
b/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv
new file mode 100644
index 0000000..9b5b3ac
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv
@@ -0,0 +1,3 @@
+1,1
+2,2
+3,3
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
new file mode 100644
index 0000000..01f46bd
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.ConnectorFormatDescriptor;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.Metadata;
+import org.apache.flink.table.descriptors.Statistics;
+import org.apache.flink.table.descriptors.StreamableDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder for creating a {@link CatalogTable}.
+ *
+ * <p>It takes {@link Descriptor}s which allow for declaring the communication 
to external
+ * systems in an implementation-agnostic way. The classpath is scanned for 
suitable table
+ * factories that match the desired configuration.
+ *
+ * <p>Use the provided builder methods to configure the catalog table 
accordingly.
+ *
+ * <p>The following example shows how to read from a connector using a JSON 
format and
+ * declaring it as a table source:
+ *
+ * <code>
+ * CatalogTable table = new CatalogTableBuilder(
+ *       new ExternalSystemXYZ()
+ *         .version("0.11"),
+ *       new TableSchema.Builder()
+ *                .fields(names, dataTypes)
+ *                .build())
+ *   .withFormat(
+ *     new Json()
+ *       .jsonSchema("{...}")
+ *       .failOnMissingField(false))
+ *   .withComment("test comment")
+ *   .build()
+ * </code>
+ */
+@PublicEvolving
+public class CatalogTableBuilder
+               extends TableDescriptor
+               implements ConnectorFormatDescriptor<CatalogTableBuilder>, 
StreamableDescriptor<CatalogTableBuilder> {
+
+       private final ConnectorDescriptor connectorDescriptor;
+       private final TableSchema tableSchema;
+       private final boolean isGeneric;
+
+       private String comment;
+
+       private Optional<FormatDescriptor> formatDescriptor = Optional.empty();
+       private Optional<Statistics> statisticsDescriptor = Optional.empty();
+       private Optional<Metadata> metadataDescriptor = Optional.empty();
+       private Optional<String> updateMode = Optional.empty();
+       private Map<String, String> properties = Collections.emptyMap();
+
+       /**
+        *
+        * @param connectorDescriptor descriptor of the connector
+        * @param tableSchema schema of the table
+        */
+       public CatalogTableBuilder(ConnectorDescriptor connectorDescriptor, 
TableSchema tableSchema) {
+               this.connectorDescriptor = checkNotNull(connectorDescriptor);
+               this.tableSchema = checkNotNull(tableSchema);
+
+               // We don't support non generic table currently
+               this.isGeneric = true;
+       }
+
+       @Override
+       public CatalogTableBuilder withFormat(FormatDescriptor format) {
+               this.formatDescriptor = Optional.of(checkNotNull(format));
+               return this;
+       }
+
+       @Override
+       public CatalogTableBuilder inAppendMode() {
+               updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND);
+               return this;
+       }
+
+       @Override
+       public CatalogTableBuilder inRetractMode() {
+               updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT);
+               return this;
+       }
+
+       @Override
+       public CatalogTableBuilder inUpsertMode() {
+               updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT);
+               return this;
+       }
+
+       public CatalogTableBuilder withComment(String comment) {
+               this.comment = comment;
+               return this;
+       }
+
+       public CatalogTableBuilder withProperties(Map<String, String> 
properties) {
+               this.properties = checkNotNull(properties);
+               return this;
+       }
+
+       /**
+        * Build a {@link CatalogTable}.
+        *
+        * @return catalog table
+        */
+       public CatalogTable build() {
+               return new CatalogTableImpl(
+                       tableSchema,
+                       toProperties(),
+                       comment);
+       }
+
+       @Override
+       public Map<String, String> toProperties() {
+               DescriptorProperties descriptorProperties = new 
DescriptorProperties();
+               
descriptorProperties.putProperties(connectorDescriptor.toProperties());
+
+               if (formatDescriptor.isPresent()) {
+                       
descriptorProperties.putProperties(formatDescriptor.get().toProperties());
+               }
+
+               if (statisticsDescriptor.isPresent()) {
+                       
descriptorProperties.putProperties(statisticsDescriptor.get().toProperties());
+               }
+
+               if (metadataDescriptor.isPresent()) {
+                       
descriptorProperties.putProperties(metadataDescriptor.get().toProperties());
+               }
+
+               if (updateMode.isPresent()) {
+                       descriptorProperties.putString(UPDATE_MODE, 
updateMode.get());
+               }
+
+               descriptorProperties.putProperties(this.properties);
+               descriptorProperties.putString(CatalogConfig.IS_GENERIC, 
String.valueOf(isGeneric));
+
+               return descriptorProperties.asMap();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 7a4a624..9f23a7f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -23,6 +23,9 @@ package org.apache.flink.table.catalog.config;
  */
 public class CatalogConfig {
 
+       /**
+        * Flag to distinguish if a meta-object is generic Flink object or not.
+        */
        public static final String IS_GENERIC = "is_generic";
 
        // Globally reserved prefix for catalog properties.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
similarity index 80%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
index 2f9a5db..7397d39 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
@@ -21,18 +21,13 @@ package org.apache.flink.table.descriptors;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * An interface for descriptors that allow to define a format and schema.
+ * An interface for descriptors that allow to define a format.
  */
 @PublicEvolving
-public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends 
Descriptor {
+public interface ConnectorFormatDescriptor<D extends 
ConnectorFormatDescriptor<D>> extends Descriptor {
 
        /**
         * Specifies the format that defines how to read data from a connector.
         */
        D withFormat(FormatDescriptor format);
-
-       /**
-        * Specifies the resulting table schema.
-        */
-       D withSchema(Schema schema);
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
index 2f9a5db..121e49c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.descriptors;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 /**
  * An interface for descriptors that allow to define a format and schema.
+ *
+ * @deprecated use {@link ConnectorFormatDescriptor}.
  */
-@PublicEvolving
+@Deprecated
 public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends 
Descriptor {
 
        /**
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
index ae5c677..52bfa06 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
@@ -52,7 +52,10 @@ import org.apache.flink.table.descriptors._
   * }}}
   *
   * @param connectorDescriptor Connector descriptor describing the external 
system
+  *
+  * @deprecated use [[CatalogTableBuilder]]
   */
+@Deprecated
 @deprecated
 class ExternalCatalogTableBuilder(private val connectorDescriptor: 
ConnectorDescriptor)
   extends TableDescriptor

Reply via email to