This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0abfaef [FLINK-12988][table] restore AbstractCatalogTable as parent
for CatalogTableImpl and ConnectorCatalogTable
0abfaef is described below
commit 0abfaef842987367ceef41f5804e2fccee853e27
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 25 15:24:29 2019 -0700
[FLINK-12988][table] restore AbstractCatalogTable as parent for
CatalogTableImpl and ConnectorCatalogTable
This PR adds back AbstractCatalogTable from FLINK-12918 to reflect the
correct inheritance structure of CatalogTableImpl and ConnectorCatalogTable.
But the unification of catalog table implementations remain untouch
This closes #8883.
---
...logTableImpl.java => AbstractCatalogTable.java} | 47 +++-----------------
.../flink/table/catalog/CatalogTableImpl.java | 50 +++-------------------
.../flink/table/catalog/ConnectorCatalogTable.java | 2 +-
3 files changed, 13 insertions(+), 86 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
similarity index 65%
copy from
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
copy to
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index c608ce9..d586b5d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -19,22 +19,17 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Schema;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An abstract catalog table.
*/
-public class CatalogTableImpl implements CatalogTable {
+public abstract class AbstractCatalogTable implements CatalogTable {
// Schema of the table (column names and types)
private final TableSchema tableSchema;
// Partition keys if this is a partitioned table. It's an empty set if
the table is not partitioned
@@ -44,14 +39,14 @@ public class CatalogTableImpl implements CatalogTable {
// Comment of the table
private final String comment;
- public CatalogTableImpl(
- TableSchema tableSchema,
- Map<String, String> properties,
- String comment) {
+ public AbstractCatalogTable(
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ String comment) {
this(tableSchema, new ArrayList<>(), properties, comment);
}
- public CatalogTableImpl(
+ public AbstractCatalogTable(
TableSchema tableSchema,
List<String> partitionKeys,
Map<String, String> properties,
@@ -86,34 +81,4 @@ public class CatalogTableImpl implements CatalogTable {
public String getComment() {
return comment;
}
-
- @Override
- public CatalogBaseTable copy() {
- return new CatalogTableImpl(
- getSchema().copy(), new
ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.of(getComment());
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a catalog table in an im-memory
catalog");
- }
-
- @Override
- public Map<String, String> toProperties() {
- DescriptorProperties descriptor = new DescriptorProperties();
-
- descriptor.putTableSchema(Schema.SCHEMA, getSchema());
-
- Map<String, String> properties = new HashMap<>(getProperties());
- properties.remove(CatalogConfig.IS_GENERIC);
-
- descriptor.putProperties(properties);
-
- return descriptor.asMap();
- }
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index c608ce9..df15a95 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -29,25 +29,15 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
- * An abstract catalog table.
+ * A catalog table implementation.
*/
-public class CatalogTableImpl implements CatalogTable {
- // Schema of the table (column names and types)
- private final TableSchema tableSchema;
- // Partition keys if this is a partitioned table. It's an empty set if
the table is not partitioned
- private final List<String> partitionKeys;
- // Properties of the table
- private final Map<String, String> properties;
- // Comment of the table
- private final String comment;
+public class CatalogTableImpl extends AbstractCatalogTable {
public CatalogTableImpl(
- TableSchema tableSchema,
- Map<String, String> properties,
- String comment) {
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ String comment) {
this(tableSchema, new ArrayList<>(), properties, comment);
}
@@ -56,35 +46,7 @@ public class CatalogTableImpl implements CatalogTable {
List<String> partitionKeys,
Map<String, String> properties,
String comment) {
- this.tableSchema = checkNotNull(tableSchema, "tableSchema
cannot be null");
- this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys
cannot be null");
- this.properties = checkNotNull(properties, "properties cannot
be null");
- this.comment = comment;
- }
-
- @Override
- public boolean isPartitioned() {
- return !partitionKeys.isEmpty();
- }
-
- @Override
- public List<String> getPartitionKeys() {
- return partitionKeys;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
- }
-
- @Override
- public TableSchema getSchema() {
- return tableSchema;
- }
-
- @Override
- public String getComment() {
- return comment;
+ super(tableSchema, partitionKeys, properties, comment);
}
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 08b3bd5..84b6e0d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -46,7 +46,7 @@ import java.util.stream.Collectors;
* @param <T2> type of the expected elements by the {@link TableSink}
*/
@Internal
-public class ConnectorCatalogTable<T1, T2> extends CatalogTableImpl {
+public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
private final TableSource<T1> tableSource;
private final TableSink<T2> tableSink;
private final boolean isBatch;