This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ff00d  [FLINK-13021][table][hive] unify catalog partition 
implementations
59ff00d is described below

commit 59ff00d71d298fa61a92efa4fecd46f3cefc50f6
Author: bowen.li <[email protected]>
AuthorDate: Mon Jul 1 12:31:47 2019 -0700

    [FLINK-13021][table][hive] unify catalog partition implementations
    
    This PR unifies catalog partition implementations.
    
    This closes #8926.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 54 ++++++++++++-------
 .../table/catalog/hive/HiveCatalogConfig.java      | 17 ++----
 .../table/catalog/hive/HiveCatalogPartition.java   | 61 ----------------------
 .../table/catalog/hive/HivePartitionConfig.java    | 17 ++----
 .../connectors/hive/HiveTableOutputFormatTest.java |  9 ++--
 .../batch/connectors/hive/HiveTableSinkTest.java   |  8 +--
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 19 -------
 flink-python/pyflink/table/catalog.py              | 26 +--------
 flink-python/pyflink/table/tests/test_catalog.py   |  8 +--
 ...logPartition.java => CatalogPartitionImpl.java} | 23 ++++++--
 .../table/catalog/GenericCatalogPartition.java     | 52 ------------------
 .../flink/table/catalog/CatalogTestBase.java       |  5 ++
 .../table/catalog/GenericInMemoryCatalogTest.java  | 26 ---------
 .../flink/table/catalog/CatalogPartition.java      |  7 +++
 .../apache/flink/table/catalog/CatalogTest.java    | 40 +++-----------
 .../flink/table/catalog/CatalogTestUtil.java       | 13 +++++
 16 files changed, 111 insertions(+), 274 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 8659a80..03ddceb 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,15 +29,14 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
-import org.apache.flink.table.catalog.GenericCatalogPartition;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.config.CatalogTableConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -483,7 +482,7 @@ public class HiveCatalog extends AbstractCatalog {
                if (isGeneric) {
                        properties = retrieveFlinkProperties(properties);
                }
-               String comment = 
properties.remove(CatalogTableConfig.TABLE_COMMENT);
+               String comment = properties.remove(HiveCatalogConfig.COMMENT);
 
                // Table schema
                TableSchema tableSchema =
@@ -515,7 +514,7 @@ public class HiveCatalog extends AbstractCatalog {
 
                Map<String, String> properties = new 
HashMap<>(table.getProperties());
                // Table comment
-               properties.put(CatalogTableConfig.TABLE_COMMENT, 
table.getComment());
+               properties.put(HiveCatalogConfig.COMMENT, table.getComment());
 
                boolean isGeneric = 
Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
 
@@ -623,8 +622,10 @@ public class HiveCatalog extends AbstractCatalog {
                checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
                checkNotNull(partition, "Partition cannot be null");
 
-               if (!(partition instanceof HiveCatalogPartition)) {
-                       throw new CatalogException("Currently only supports 
HiveCatalogPartition");
+               boolean isGeneric = 
Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+               if (isGeneric) {
+                       throw new CatalogException("Currently only supports 
non-generic CatalogPartition");
                }
 
                Table hiveTable = getHiveTable(tablePath);
@@ -715,7 +716,14 @@ public class HiveCatalog extends AbstractCatalog {
 
                try {
                        Partition hivePartition = getHivePartition(tablePath, 
partitionSpec);
-                       return instantiateCatalogPartition(hivePartition);
+
+                       Map<String, String> properties = 
hivePartition.getParameters();
+
+                       properties.put(HivePartitionConfig.PARTITION_LOCATION, 
hivePartition.getSd().getLocation());
+
+                       String comment = 
properties.remove(HiveCatalogConfig.COMMENT);
+
+                       return new CatalogPartitionImpl(properties, comment);
                } catch (NoSuchObjectException | MetaException | 
TableNotExistException | PartitionSpecInvalidException e) {
                        throw new PartitionNotExistException(getName(), 
tablePath, partitionSpec, e);
                } catch (TException e) {
@@ -731,8 +739,10 @@ public class HiveCatalog extends AbstractCatalog {
                checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
                checkNotNull(newPartition, "New partition cannot be null");
 
-               if (!(newPartition instanceof HiveCatalogPartition)) {
-                       throw new CatalogException("Currently only supports 
HiveCatalogPartition");
+               boolean isGeneric = 
Boolean.valueOf(newPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+               if (isGeneric) {
+                       throw new CatalogException("Currently only supports 
non-generic CatalogPartition");
                }
 
                // Explicitly check if the partition exists or not
@@ -771,11 +781,12 @@ public class HiveCatalog extends AbstractCatalog {
 
        // make sure both table and partition are generic, or neither is
        private static void ensureTableAndPartitionMatch(Table hiveTable, 
CatalogPartition catalogPartition) {
-               boolean isGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
-               if ((isGeneric && catalogPartition instanceof 
HiveCatalogPartition) ||
-                       (!isGeneric && catalogPartition instanceof 
GenericCatalogPartition)) {
+               boolean tableIsGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
+               boolean partitionIsGeneric = 
Boolean.valueOf(catalogPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+               if (tableIsGeneric != partitionIsGeneric) {
                        throw new CatalogException(String.format("Cannot handle 
%s partition for %s table",
-                               catalogPartition.getClass().getName(), 
isGeneric ? "generic" : "non-generic"));
+                               catalogPartition.getClass().getName(), 
tableIsGeneric ? "generic" : "non-generic"));
                }
        }
 
@@ -792,15 +803,18 @@ public class HiveCatalog extends AbstractCatalog {
                        }
                }
                // TODO: handle GenericCatalogPartition
-               HiveCatalogPartition hiveCatalogPartition = 
(HiveCatalogPartition) catalogPartition;
                StorageDescriptor sd = hiveTable.getSd().deepCopy();
-               sd.setLocation(hiveCatalogPartition.getLocation());
-               return HiveTableUtil.createHivePartition(hiveTable.getDbName(), 
hiveTable.getTableName(), partValues,
-                               sd, hiveCatalogPartition.getProperties());
-       }
+               
sd.setLocation(catalogPartition.getProperties().remove(HivePartitionConfig.PARTITION_LOCATION));
+
+               Map<String, String> properties = new 
HashMap<>(catalogPartition.getProperties());
+               properties.put(HiveCatalogConfig.COMMENT, 
catalogPartition.getComment());
 
-       private static CatalogPartition instantiateCatalogPartition(Partition 
hivePartition) {
-               return new HiveCatalogPartition(hivePartition.getParameters(), 
hivePartition.getSd().getLocation());
+               return HiveTableUtil.createHivePartition(
+                               hiveTable.getDbName(),
+                               hiveTable.getTableName(),
+                               partValues,
+                               sd,
+                               properties);
        }
 
        private void ensurePartitionedTable(ObjectPath tablePath, Table 
hiveTable) throws TableNotPartitionedException {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
similarity index 63%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
index 6d0c514..684f50e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogConfig.java
@@ -16,21 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.config;
-
-import org.apache.flink.table.catalog.CatalogBaseTable;
+package org.apache.flink.table.catalog.hive;
 
 /**
- * Config for {@link CatalogBaseTable}.
+ * Configs for catalog meta-objects in {@link HiveCatalog}.
  */
-public class CatalogTableConfig {
-
-       // Comment of catalog table
-       public static final String TABLE_COMMENT = "comment";
-
-       // Partition keys of catalog table
-       public static final String TABLE_PARTITION_KEYS = "partition-keys";
+public class HiveCatalogConfig {
 
-       // Prefix for properties of catalog table
-       public static final String TABLE_PROPERTIES = "properties";
+       public static final String COMMENT = "comment";
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
deleted file mode 100644
index 98b13a2..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.catalog.AbstractCatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartition;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A CatalogPartition implementation that represents a Partition in Hive.
- */
-public class HiveCatalogPartition extends AbstractCatalogPartition {
-       private final String location;
-
-       public HiveCatalogPartition(Map<String, String> properties, String 
location) {
-               super(properties, null);
-               this.location = location;
-       }
-
-       public HiveCatalogPartition(Map<String, String> properties) {
-               this(properties, null);
-       }
-
-       public String getLocation() {
-               return location;
-       }
-
-       @Override
-       public CatalogPartition copy() {
-               return new HiveCatalogPartition(new HashMap<>(getProperties()), 
location);
-       }
-
-       @Override
-       public Optional<String> getDescription() {
-               return Optional.empty();
-       }
-
-       @Override
-       public Optional<String> getDetailedDescription() {
-               return Optional.empty();
-       }
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
similarity index 63%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
rename to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
index 6d0c514..0551b72 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HivePartitionConfig.java
@@ -16,21 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.config;
-
-import org.apache.flink.table.catalog.CatalogBaseTable;
+package org.apache.flink.table.catalog.hive;
 
 /**
- * Config for {@link CatalogBaseTable}.
+ * Configs for partition in {@link HiveCatalog}.
  */
-public class CatalogTableConfig {
-
-       // Comment of catalog table
-       public static final String TABLE_COMMENT = "comment";
-
-       // Partition keys of catalog table
-       public static final String TABLE_PARTITION_KEYS = "partition-keys";
+public class HivePartitionConfig {
+       public static final String PARTITION_LOCATION = "partition.location";
 
-       // Prefix for properties of catalog table
-       public static final String TABLE_PROPERTIES = "properties";
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index 206f831..89dfc8a 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
+import org.apache.flink.table.catalog.hive.HivePartitionConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.types.Row;
 
@@ -128,9 +129,11 @@ public class HiveTableOutputFormatTest {
 
                // make sure new partition is created
                assertEquals(toWrite.size(), 
hiveCatalog.listPartitions(tablePath).size());
-               HiveCatalogPartition catalogPartition = (HiveCatalogPartition) 
hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
+               CatalogPartition catalogPartition = 
hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
                                
partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()))));
-               verifyWrittenData(new Path(catalogPartition.getLocation(), 
"0"), toWrite, 1);
+
+               String partitionLocation = 
catalogPartition.getProperties().get(HivePartitionConfig.PARTITION_LOCATION);
+               verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
 
                hiveCatalog.dropTable(tablePath, false);
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 6b25adb..55f5336 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,12 +23,13 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
+import org.apache.flink.table.catalog.hive.HivePartitionConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
@@ -121,8 +122,9 @@ public class HiveTableSinkTest {
                List<CatalogPartitionSpec> partitionSpecs = 
hiveCatalog.listPartitions(tablePath);
                assertEquals(toWrite.size(), partitionSpecs.size());
                for (int i = 0; i < toWrite.size(); i++) {
-                       HiveCatalogPartition partition = (HiveCatalogPartition) 
hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
-                       verifyWrittenData(new Path(partition.getLocation(), 
"0"), Collections.singletonList(toWrite.get(i)), 1);
+                       CatalogPartition partition = 
hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
+                       String partitionLocation = 
partition.getProperties().get(HivePartitionConfig.PARTITION_LOCATION);
+                       verifyWrittenData(new Path(partitionLocation, "0"), 
Collections.singletonList(toWrite.get(i)), 1);
                }
 
                hiveCatalog.dropTable(tablePath, false);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index e641e23..c10d31c 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -43,9 +42,7 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Test for HiveCatalog on Hive metadata.
@@ -137,20 +134,4 @@ public class HiveCatalogHiveMetadataTest extends 
CatalogTestBase {
                        "Hive table cannot be streaming."
                );
        }
-
-       @Override
-       public CatalogPartition createPartition() {
-               return new HiveCatalogPartition(getBatchTableProperties());
-       }
-
-       @Override
-       protected void checkEquals(CatalogPartition expected, CatalogPartition 
actual) {
-               assertTrue(expected instanceof HiveCatalogPartition && actual 
instanceof HiveCatalogPartition);
-               assertEquals(expected.getClass(), actual.getClass());
-               HiveCatalogPartition hivePartition1 = (HiveCatalogPartition) 
expected;
-               HiveCatalogPartition hivePartition2 = (HiveCatalogPartition) 
actual;
-               assertEquals(hivePartition1.getDescription(), 
hivePartition2.getDescription());
-               assertEquals(hivePartition1.getDetailedDescription(), 
hivePartition2.getDetailedDescription());
-               
assertTrue(hivePartition2.getProperties().entrySet().containsAll(hivePartition1.getProperties().entrySet()));
-       }
 }
diff --git a/flink-python/pyflink/table/catalog.py 
b/flink-python/pyflink/table/catalog.py
index d283d16..2748d77 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -23,8 +23,7 @@ from pyflink.table.table_schema import TableSchema
 
 __all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 
'CatalogPartition', 'CatalogFunction',
            'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
-           'CatalogColumnStatistics', 'HiveCatalog',
-           'HiveCatalogPartition']
+           'CatalogColumnStatistics', 'HiveCatalog']
 
 
 class Catalog(object):
@@ -695,11 +694,7 @@ class CatalogPartition(object):
 
     @staticmethod
     def _get(j_catalog_partition):
-        if j_catalog_partition.getClass().getName() == \
-                "org.apache.flink.table.catalog.hive.HiveCatalogPartition":
-            return 
HiveCatalogPartition(j_hive_catalog_partition=j_catalog_partition)
-        else:
-            return CatalogPartition(j_catalog_partition)
+        return CatalogPartition(j_catalog_partition)
 
     def get_properties(self):
         """
@@ -973,20 +968,3 @@ class HiveCatalog(Catalog):
             j_hive_catalog = 
gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalog(
                 catalog_name, default_database, hive_site_url)
         super(HiveCatalog, self).__init__(j_hive_catalog)
-
-
-class HiveCatalogPartition(CatalogPartition):
-    """
-    A CatalogPartition implementation that represents a Partition in Hive.
-    """
-
-    def __int__(self, properties=None, location=None, 
j_hive_catalog_partition=None):
-        gateway = get_gateway()
-        if j_hive_catalog_partition is None:
-            j_hive_catalog_partition = \
-                
gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalogPartition(
-                    properties, location)
-        super(HiveCatalogPartition, self).__init__(j_hive_catalog_partition)
-
-    def get_location(self):
-        return self._j_catalog_partition.getLocation()
diff --git a/flink-python/pyflink/table/tests/test_catalog.py 
b/flink-python/pyflink/table/tests/test_catalog.py
index 5a340f7..81ea53a 100644
--- a/flink-python/pyflink/table/tests/test_catalog.py
+++ b/flink-python/pyflink/table/tests/test_catalog.py
@@ -204,8 +204,8 @@ class CatalogTestBase(PyFlinkTestCase):
     @staticmethod
     def create_partition():
         gateway = get_gateway()
-        j_partition = gateway.jvm.GenericCatalogPartition(
-            CatalogTestBase.get_batch_table_properties(), "Generic batch 
table")
+        j_partition = gateway.jvm.CatalogPartitionImpl(
+            CatalogTestBase.get_batch_table_properties(), "catalog partition 
tests")
         return CatalogPartition(j_partition)
 
     @staticmethod
@@ -808,8 +808,8 @@ class CatalogTestBase(PyFlinkTestCase):
         self.assertIsNone(cp.get_properties().get("k"))
 
         gateway = get_gateway()
-        j_partition = gateway.jvm.GenericCatalogPartition(
-            {"is_streaming": "false", "k": "v"}, "Generic batch table")
+        j_partition = gateway.jvm.CatalogPartitionImpl(
+            {"is_streaming": "false", "k": "v"}, "catalog partition")
         another = CatalogPartition(j_partition)
         self.catalog.alter_partition(self.path1, self.create_partition_spec(), 
another, False)
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
similarity index 69%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
index 818d72a..f082cdc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogPartitionImpl.java
@@ -18,18 +18,20 @@
 
 package org.apache.flink.table.catalog;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An abstract catalog partition implementation.
+ * A catalog partition implementation.
  */
-public abstract class AbstractCatalogPartition implements CatalogPartition {
+public class CatalogPartitionImpl implements CatalogPartition {
        private final Map<String, String> properties;
        private final String comment;
 
-       public AbstractCatalogPartition(Map<String, String> properties, String 
comment) {
+       public CatalogPartitionImpl(Map<String, String> properties, String 
comment) {
                this.properties = checkNotNull(properties, "properties cannot 
be null");
                this.comment = comment;
        }
@@ -39,8 +41,23 @@ public abstract class AbstractCatalogPartition implements 
CatalogPartition {
                return properties;
        }
 
+       @Override
        public String getComment() {
                return comment;
        }
 
+       @Override
+       public CatalogPartition copy() {
+               return new CatalogPartitionImpl(new HashMap<>(properties), 
comment);
+       }
+
+       @Override
+       public Optional<String> getDescription() {
+               return Optional.empty();
+       }
+
+       @Override
+       public Optional<String> getDetailedDescription() {
+               return Optional.empty();
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
deleted file mode 100644
index 1a4563f..0000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.catalog.config.CatalogConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A generic catalog partition implementation.
- */
-public class GenericCatalogPartition extends AbstractCatalogPartition {
-
-       public GenericCatalogPartition(Map<String, String> properties, String 
comment) {
-               super(properties, comment);
-               properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
-       }
-
-       @Override
-       public CatalogPartition copy() {
-               return new GenericCatalogPartition(new 
HashMap<>(getProperties()), getComment());
-       }
-
-       @Override
-       public Optional<String> getDescription() {
-               return Optional.of(getComment());
-       }
-
-       @Override
-       public Optional<String> getDetailedDescription() {
-               return Optional.of("This is a generic catalog partition with 
detailed description");
-       }
-
-}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 801914f..19e9123 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -96,6 +96,11 @@ public abstract class CatalogTestBase extends CatalogTest {
        }
 
        @Override
+       public CatalogPartition createPartition() {
+               return new CatalogPartitionImpl(getBatchTableProperties(), 
TEST_COMMENT);
+       }
+
+       @Override
        public CatalogView createView() {
                return new CatalogViewImpl(
                        String.format("select * from %s", t1),
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 0014501..45c511b 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -88,26 +87,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                assertFalse(catalog.partitionExists(path1, 
catalogPartitionSpec));
        }
 
-       // ------ partitions ------
-
-       @Test
-       public void testAlterPartition_differentTypedPartition() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               CatalogPartition partition = createPartition();
-               catalog.createPartition(path1, partitionSpec, partition, false);
-
-               exception.expect(CatalogException.class);
-               exception.expectMessage(
-                       String.format("Partition types don't match. " +
-                               "Existing partition is '%s' and " +
-                               "new partition is 
'org.apache.flink.table.catalog.CatalogTest$TestPartition'.",
-                               partition.getClass().getName()));
-               catalog.alterPartition(path1, partitionSpec, new 
TestPartition(), false);
-       }
-
        // ------ statistics ------
 
        @Test
@@ -156,11 +135,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                return true;
        }
 
-       @Override
-       public CatalogPartition createPartition() {
-               return new GenericCatalogPartition(getBatchTableProperties(), 
"Generic batch table");
-       }
-
        private CatalogColumnStatistics createColumnStats() {
                CatalogColumnStatisticsDataBoolean booleanColStats = new 
CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
                CatalogColumnStatisticsDataLong longColStats = new 
CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
index 47dce25..7c36ed8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
@@ -34,6 +34,13 @@ public interface CatalogPartition {
        Map<String, String> getProperties();
 
        /**
+        * Get comment of the partition.
+        *
+        * @return comment of the partition
+        */
+       String getComment();
+
+       /**
         * Get a deep copy of the CatalogPartition instance.
         *
         * @return a copy of CatalogPartition instance
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 757a4c3..0c2b632 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -742,13 +742,13 @@ public abstract class CatalogTest {
 
                assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
                assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1, createPartitionSpecSubset()));
-               checkEquals(createPartition(), catalog.getPartition(path1, 
createPartitionSpec()));
+               CatalogTestUtil.checkEquals(createPartition(), 
catalog.getPartition(path1, createPartitionSpec()));
 
                catalog.createPartition(path1, createAnotherPartitionSpec(), 
createPartition(), false);
 
                assertEquals(Arrays.asList(createPartitionSpec(), 
createAnotherPartitionSpec()), catalog.listPartitions(path1));
                assertEquals(Arrays.asList(createPartitionSpec(), 
createAnotherPartitionSpec()), catalog.listPartitions(path1, 
createPartitionSpecSubset()));
-               checkEquals(createPartition(), catalog.getPartition(path1, 
createAnotherPartitionSpec()));
+               CatalogTestUtil.checkEquals(createPartition(), 
catalog.getPartition(path1, createAnotherPartitionSpec()));
        }
 
        @Test
@@ -891,16 +891,19 @@ public abstract class CatalogTest {
 
                assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
                CatalogPartition cp = catalog.getPartition(path1, 
createPartitionSpec());
-               checkEquals(createPartition(), cp);
+               CatalogTestUtil.checkEquals(createPartition(), cp);
                assertNull(cp.getProperties().get("k"));
 
                CatalogPartition another = createPartition();
                another.getProperties().put("k", "v");
+
                catalog.alterPartition(path1, createPartitionSpec(), another, 
false);
 
                assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
+
                cp = catalog.getPartition(path1, createPartitionSpec());
-               checkEquals(another, cp);
+
+               CatalogTestUtil.checkEquals(another, cp);
                assertEquals("v", cp.getProperties().get("k"));
        }
 
@@ -1233,31 +1236,6 @@ public abstract class CatalogTest {
                }
        }
 
-       /**
-        * Test partition used to assert on partition of different class.
-        */
-       public static class TestPartition implements CatalogPartition {
-               @Override
-               public Map<String, String> getProperties() {
-                       return null;
-               }
-
-               @Override
-               public CatalogPartition copy() {
-                       return null;
-               }
-
-               @Override
-               public Optional<String> getDescription() {
-                       return Optional.empty();
-               }
-
-               @Override
-               public Optional<String> getDetailedDescription() {
-                       return Optional.empty();
-               }
-       }
-
        // ------ equality check utils ------
        // Can be overriden by sub test class
 
@@ -1266,10 +1244,6 @@ public abstract class CatalogTest {
                assertEquals(f1.getProperties(), f2.getProperties());
        }
 
-       protected void checkEquals(CatalogPartition expected, CatalogPartition 
actual) {
-               assertEquals(expected.getProperties(), actual.getProperties());
-       }
-
        protected void checkEquals(CatalogColumnStatistics cs1, 
CatalogColumnStatistics cs2) {
                CatalogTestUtil.checkEquals(cs1, cs2);
        }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 1c64025..53e4ed7 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -80,6 +80,19 @@ public class CatalogTestUtil {
                }
        }
 
+       public static void checkEquals(CatalogPartition p1, CatalogPartition 
p2) {
+               assertEquals(p1.getClass(), p2.getClass());
+               assertEquals(p1.getComment(), p2.getComment());
+
+               // Hive tables may have properties created by itself
+               // thus properties of Hive table is a super set of those in its 
corresponding Flink table
+               if 
(Boolean.valueOf(p1.getProperties().get(CatalogConfig.IS_GENERIC))) {
+                       assertEquals(p1.getProperties(), p2.getProperties());
+               } else {
+                       
assertTrue(p2.getProperties().entrySet().containsAll(p1.getProperties().entrySet()));
+               }
+       }
+
        public static void checkEquals(TableStats ts1, TableStats ts2) {
                assertEquals(ts1.getRowCount(), ts2.getRowCount());
                assertEquals(ts1.getColumnStats().size(), 
ts2.getColumnStats().size());

Reply via email to