This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0abfaef  [FLINK-12988][table] restore AbstractCatalogTable as parent 
for CatalogTableImpl and ConnectorCatalogTable
0abfaef is described below

commit 0abfaef842987367ceef41f5804e2fccee853e27
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 25 15:24:29 2019 -0700

    [FLINK-12988][table] restore AbstractCatalogTable as parent for 
CatalogTableImpl and ConnectorCatalogTable
    
    This PR adds back AbstractCatalogTable from FLINK-12918 to reflect the 
correct inheritance structure of CatalogTableImpl and ConnectorCatalogTable. 
But the unification of catalog table implementations remain untouch
    
    This closes #8883.
---
 ...logTableImpl.java => AbstractCatalogTable.java} | 47 +++-----------------
 .../flink/table/catalog/CatalogTableImpl.java      | 50 +++-------------------
 .../flink/table/catalog/ConnectorCatalogTable.java |  2 +-
 3 files changed, 13 insertions(+), 86 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
similarity index 65%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index c608ce9..d586b5d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -19,22 +19,17 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Schema;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An abstract catalog table.
  */
-public class CatalogTableImpl implements CatalogTable {
+public abstract class AbstractCatalogTable implements CatalogTable {
        // Schema of the table (column names and types)
        private final TableSchema tableSchema;
        // Partition keys if this is a partitioned table. It's an empty set if 
the table is not partitioned
@@ -44,14 +39,14 @@ public class CatalogTableImpl implements CatalogTable {
        // Comment of the table
        private final String comment;
 
-       public CatalogTableImpl(
-               TableSchema tableSchema,
-               Map<String, String> properties,
-               String comment) {
+       public AbstractCatalogTable(
+                       TableSchema tableSchema,
+                       Map<String, String> properties,
+                       String comment) {
                this(tableSchema, new ArrayList<>(), properties, comment);
        }
 
-       public CatalogTableImpl(
+       public AbstractCatalogTable(
                        TableSchema tableSchema,
                        List<String> partitionKeys,
                        Map<String, String> properties,
@@ -86,34 +81,4 @@ public class CatalogTableImpl implements CatalogTable {
        public String getComment() {
                return comment;
        }
-
-       @Override
-       public CatalogBaseTable copy() {
-               return new CatalogTableImpl(
-                       getSchema().copy(), new 
ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
-       }
-
-       @Override
-       public Optional<String> getDescription() {
-               return Optional.of(getComment());
-       }
-
-       @Override
-       public Optional<String> getDetailedDescription() {
-               return Optional.of("This is a catalog table in an im-memory 
catalog");
-       }
-
-       @Override
-       public Map<String, String> toProperties() {
-               DescriptorProperties descriptor = new DescriptorProperties();
-
-               descriptor.putTableSchema(Schema.SCHEMA, getSchema());
-
-               Map<String, String> properties = new HashMap<>(getProperties());
-               properties.remove(CatalogConfig.IS_GENERIC);
-
-               descriptor.putProperties(properties);
-
-               return descriptor.asMap();
-       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index c608ce9..df15a95 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -29,25 +29,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
- * An abstract catalog table.
+ * A catalog table implementation.
  */
-public class CatalogTableImpl implements CatalogTable {
-       // Schema of the table (column names and types)
-       private final TableSchema tableSchema;
-       // Partition keys if this is a partitioned table. It's an empty set if 
the table is not partitioned
-       private final List<String> partitionKeys;
-       // Properties of the table
-       private final Map<String, String> properties;
-       // Comment of the table
-       private final String comment;
+public class CatalogTableImpl extends AbstractCatalogTable {
 
        public CatalogTableImpl(
-               TableSchema tableSchema,
-               Map<String, String> properties,
-               String comment) {
+                       TableSchema tableSchema,
+                       Map<String, String> properties,
+                       String comment) {
                this(tableSchema, new ArrayList<>(), properties, comment);
        }
 
@@ -56,35 +46,7 @@ public class CatalogTableImpl implements CatalogTable {
                        List<String> partitionKeys,
                        Map<String, String> properties,
                        String comment) {
-               this.tableSchema = checkNotNull(tableSchema, "tableSchema 
cannot be null");
-               this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys 
cannot be null");
-               this.properties = checkNotNull(properties, "properties cannot 
be null");
-               this.comment = comment;
-       }
-
-       @Override
-       public boolean isPartitioned() {
-               return !partitionKeys.isEmpty();
-       }
-
-       @Override
-       public List<String> getPartitionKeys() {
-               return partitionKeys;
-       }
-
-       @Override
-       public Map<String, String> getProperties() {
-               return properties;
-       }
-
-       @Override
-       public TableSchema getSchema() {
-               return tableSchema;
-       }
-
-       @Override
-       public String getComment() {
-               return comment;
+               super(tableSchema, partitionKeys, properties, comment);
        }
 
        @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 08b3bd5..84b6e0d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -46,7 +46,7 @@ import java.util.stream.Collectors;
  * @param <T2> type of the expected elements by the {@link TableSink}
  */
 @Internal
-public class ConnectorCatalogTable<T1, T2> extends CatalogTableImpl {
+public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
        private final TableSource<T1> tableSource;
        private final TableSink<T2> tableSink;
        private final boolean isBatch;

Reply via email to