This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1f2d72  [FLINK-12727][hive] Make HiveTableOutputFormat support 
writing partitioned tables
d1f2d72 is described below

commit d1f2d722588c1e979510b5de5c8c22dc51d126f1
Author: Rui Li <[email protected]>
AuthorDate: Tue Jun 4 20:38:47 2019 +0800

    [FLINK-12727][hive] Make HiveTableOutputFormat support writing partitioned 
tables
    
    This PR adds support for writing (both static and dynamic) partitioned 
table in HiveTableOutputFormat.
    
    This closes #8614.
---
 .../connectors/hive/HiveTableOutputFormat.java     | 125 ++++++++++++++++-----
 .../flink/batch/connectors/hive/HiveTableUtil.java |  60 ----------
 .../flink/table/catalog/hive/HiveCatalog.java      |  16 +--
 .../table/catalog/hive/util/HiveTableUtil.java     |  45 ++++++++
 .../batch/connectors/hive/HiveInputFormatTest.java |   7 +-
 .../connectors/hive/HiveTableOutputFormatTest.java |  99 +++++++++++++---
 6 files changed, 231 insertions(+), 121 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index 61179fa..5e15d72 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -40,7 +41,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -81,6 +86,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -98,17 +104,17 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        private static final long serialVersionUID = 5167529504848109023L;
 
        private transient JobConf jobConf;
-       private transient String dbName;
+       private transient String databaseName;
        private transient String tableName;
-       private transient List<String> partitionCols;
+       private transient List<String> partitionColumns;
        private transient RowTypeInfo rowTypeInfo;
        private transient HiveTablePartition hiveTablePartition;
-       private transient Properties tblProperties;
+       private transient Properties tableProperties;
        private transient boolean overwrite;
        private transient boolean isPartitioned;
        private transient boolean isDynamicPartition;
        // number of non-partitioning columns
-       private transient int numNonPartitionCols;
+       private transient int numNonPartitionColumns;
 
        private transient AbstractSerDe serializer;
        //StructObjectInspector represents the hive row structure.
@@ -117,32 +123,35 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        private transient TaskAttemptContext context;
 
        // Maps a partition dir name to the corresponding writer. Used for 
dynamic partitioning.
-       private transient Map<String, HivePartitionWriter> partitionToWriter;
+       private transient Map<String, HivePartitionWriter> partitionToWriter = 
new HashMap<>();
        // Writer for non-partitioned and static partitioned table
        private transient HivePartitionWriter staticWriter;
 
-       public HiveTableOutputFormat(JobConf jobConf, String dbName, String 
tableName, List<String> partitionCols,
+       // the offset of dynamic partition columns within a row
+       private transient int dynamicPartitionOffset;
+
+       public HiveTableOutputFormat(JobConf jobConf, String databaseName, 
String tableName, List<String> partitionColumns,
                                                                RowTypeInfo 
rowTypeInfo, HiveTablePartition hiveTablePartition,
-                                                               Properties 
tblProperties, boolean overwrite) {
+                                                               Properties 
tableProperties, boolean overwrite) {
                super(jobConf.getCredentials());
 
-               
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(dbName), "DB 
name is empty");
+               
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), 
"DB name is empty");
                
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(tableName), 
"Table name is empty");
                Preconditions.checkNotNull(rowTypeInfo, "RowTypeInfo cannot be 
null");
                Preconditions.checkNotNull(hiveTablePartition, 
"HiveTablePartition cannot be null");
-               Preconditions.checkNotNull(tblProperties, "Table properties 
cannot be null");
+               Preconditions.checkNotNull(tableProperties, "Table properties 
cannot be null");
 
                HadoopUtils.mergeHadoopConf(jobConf);
                this.jobConf = jobConf;
-               this.dbName = dbName;
+               this.databaseName = databaseName;
                this.tableName = tableName;
-               this.partitionCols = partitionCols;
+               this.partitionColumns = partitionColumns;
                this.rowTypeInfo = rowTypeInfo;
                this.hiveTablePartition = hiveTablePartition;
-               this.tblProperties = tblProperties;
+               this.tableProperties = tableProperties;
                this.overwrite = overwrite;
-               isPartitioned = partitionCols != null && 
!partitionCols.isEmpty();
-               isDynamicPartition = isPartitioned && partitionCols.size() > 
hiveTablePartition.getPartitionSpec().size();
+               isPartitioned = partitionColumns != null && 
!partitionColumns.isEmpty();
+               isDynamicPartition = isPartitioned && partitionColumns.size() > 
hiveTablePartition.getPartitionSpec().size();
        }
 
        //  Custom serialization methods
@@ -155,10 +164,10 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                out.writeObject(overwrite);
                out.writeObject(rowTypeInfo);
                out.writeObject(hiveTablePartition);
-               out.writeObject(partitionCols);
-               out.writeObject(dbName);
+               out.writeObject(partitionColumns);
+               out.writeObject(databaseName);
                out.writeObject(tableName);
-               out.writeObject(tblProperties);
+               out.writeObject(tableProperties);
        }
 
        @SuppressWarnings("unchecked")
@@ -178,11 +187,11 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                overwrite = (boolean) in.readObject();
                rowTypeInfo = (RowTypeInfo) in.readObject();
                hiveTablePartition = (HiveTablePartition) in.readObject();
-               partitionCols = (List<String>) in.readObject();
-               dbName = (String) in.readObject();
+               partitionColumns = (List<String>) in.readObject();
+               databaseName = (String) in.readObject();
                tableName = (String) in.readObject();
                partitionToWriter = new HashMap<>();
-               tblProperties = (Properties) in.readObject();
+               tableProperties = (Properties) in.readObject();
        }
 
        @Override
@@ -191,12 +200,27 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                Path stagingDir = new Path(jobSD.getLocation());
                FileSystem fs = stagingDir.getFileSystem(jobConf);
                try (HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class))) {
-                       Table table = client.getTable(dbName, tableName);
+                       Table table = client.getTable(databaseName, tableName);
                        if (!isDynamicPartition) {
                                commitJob(stagingDir.toString());
                        }
                        if (isPartitioned) {
-                               // TODO: to be implemented
+                               if (isDynamicPartition) {
+                                       FileStatus[] generatedParts = 
HiveStatsUtils.getFileStatusRecurse(stagingDir,
+                                               partitionColumns.size() - 
hiveTablePartition.getPartitionSpec().size(), fs);
+                                       for (FileStatus part : generatedParts) {
+                                               
commitJob(part.getPath().toString());
+                                               LinkedHashMap<String, String> 
fullPartSpec = new LinkedHashMap<>();
+                                               
Warehouse.makeSpecFromName(fullPartSpec, part.getPath());
+                                               loadPartition(part.getPath(), 
table, fullPartSpec, client);
+                                       }
+                               } else {
+                                       LinkedHashMap<String, String> partSpec 
= new LinkedHashMap<>();
+                                       for (String partCol : 
hiveTablePartition.getPartitionSpec().keySet()) {
+                                               partSpec.put(partCol, 
hiveTablePartition.getPartitionSpec().get(partCol).toString());
+                                       }
+                                       loadPartition(stagingDir, table, 
partSpec, client);
+                               }
                        } else {
                                moveFiles(stagingDir, new 
Path(table.getSd().getLocation()));
                        }
@@ -223,7 +247,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                        serializer = (AbstractSerDe) 
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
                        ReflectionUtils.setConf(serializer, jobConf);
                        // TODO: support partition properties, for now assume 
they're same as table properties
-                       SerDeUtils.initializeSerDe(serializer, jobConf, 
tblProperties, null);
+                       SerDeUtils.initializeSerDe(serializer, jobConf, 
tableProperties, null);
                        outputClass = serializer.getSerializedClass();
                } catch (IllegalAccessException | SerDeException | 
InstantiationException | ClassNotFoundException e) {
                        throw new FlinkRuntimeException("Error initializing 
Hive serializer", e);
@@ -243,10 +267,12 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
 
                if (!isDynamicPartition) {
                        staticWriter = 
writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
+               } else {
+                       dynamicPartitionOffset = rowTypeInfo.getArity() - 
partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
                }
 
                List<ObjectInspector> objectInspectors = new ArrayList<>();
-               for (int i = 0; i < rowTypeInfo.getArity() - 
partitionCols.size(); i++) {
+               for (int i = 0; i < rowTypeInfo.getArity() - 
partitionColumns.size(); i++) {
                        
objectInspectors.add(HiveTableUtil.getObjectInspector(LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i))));
                }
 
@@ -254,12 +280,12 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
                                Arrays.asList(rowTypeInfo.getFieldNames()),
                                objectInspectors);
-                       numNonPartitionCols = rowTypeInfo.getArity();
+                       numNonPartitionColumns = rowTypeInfo.getArity();
                } else {
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
-                               
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionCols.size()),
+                               
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionColumns.size()),
                                objectInspectors);
-                       numNonPartitionCols = rowTypeInfo.getArity() - 
partitionCols.size();
+                       numNonPartitionColumns = rowTypeInfo.getArity() - 
partitionColumns.size();
                }
        }
 
@@ -268,11 +294,48 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                try {
                        HivePartitionWriter partitionWriter = staticWriter;
                        if (isDynamicPartition) {
-                               // TODO: to be implemented
+                               LinkedHashMap<String, String> dynPartSpec = new 
LinkedHashMap<>();
+                               // only need to check the dynamic partitions
+                               final int numStaticPart = 
hiveTablePartition.getPartitionSpec().size();
+                               for (int i = dynamicPartitionOffset; i < 
record.getArity(); i++) {
+                                       // TODO: seems Hive also just calls 
toString(), need further investigation to confirm
+                                       // TODO: validate partition value
+                                       String partVal = 
record.getField(i).toString();
+                                       dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partVal);
+                               }
+                               String partName = 
Warehouse.makePartPath(dynPartSpec);
+                               partitionWriter = 
partitionToWriter.get(partName);
+                               if (partitionWriter == null) {
+                                       String stagingDir = 
hiveTablePartition.getStorageDescriptor().getLocation();
+                                       partitionWriter = 
writerForLocation(stagingDir + Path.SEPARATOR + partName);
+                                       partitionToWriter.put(partName, 
partitionWriter);
+                               }
                        }
                        
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record),
 rowObjectInspector));
                } catch (IOException | SerDeException e) {
                        throw new IOException("Could not write Record.", e);
+               } catch (MetaException e) {
+                       throw new CatalogException(e);
+               }
+       }
+
+       // load a single partition
+       private void loadPartition(Path srcDir, Table table, Map<String, 
String> partSpec, HiveMetastoreClientWrapper client)
+                       throws TException, IOException {
+               Path tblLocation = new Path(table.getSd().getLocation());
+               List<Partition> existingPart = 
client.listPartitions(databaseName, tableName,
+                               new ArrayList<>(partSpec.values()), (short) 1);
+               Path destDir = existingPart.isEmpty() ? new Path(tblLocation, 
Warehouse.makePartPath(partSpec)) :
+                               new 
Path(existingPart.get(0).getSd().getLocation());
+               moveFiles(srcDir, destDir);
+               // register new partition if it doesn't exist
+               if (existingPart.isEmpty()) {
+                       StorageDescriptor sd = new 
StorageDescriptor(hiveTablePartition.getStorageDescriptor());
+                       sd.setLocation(destDir.toString());
+                       Partition partition = 
HiveTableUtil.createHivePartition(databaseName, tableName,
+                                       new ArrayList<>(partSpec.values()), sd, 
new HashMap<>());
+                       partition.setValues(new ArrayList<>(partSpec.values()));
+                       client.add_partition(partition);
                }
        }
 
@@ -320,8 +383,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
 
        // converts a Row to a list so that Hive can serialize it
        private Object getConvertedRow(Row record) {
-               List<Object> res = new ArrayList<>(numNonPartitionCols);
-               for (int i = 0; i < numNonPartitionCols; i++) {
+               List<Object> res = new ArrayList<>(numNonPartitionColumns);
+               for (int i = 0; i < numNonPartitionColumns; i++) {
                        res.add(record.getField(i));
                }
                return res;
@@ -388,7 +451,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                FileSinkOperator.RecordWriter recordWriter;
                try {
                        recordWriter = 
HiveFileFormatUtils.getRecordWriter(clonedConf, outputFormat,
-                               outputClass, isCompressed, tblProperties, 
taskPath, Reporter.NULL);
+                               outputClass, isCompressed, tableProperties, 
taskPath, Reporter.NULL);
                } catch (HiveException e) {
                        throw new IOException(e);
                }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
deleted file mode 100644
index 70d3852..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.hive;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
-import org.apache.flink.table.types.DataType;
-
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import java.io.IOException;
-
-/**
- * Util class for accessing Hive tables.
- */
-public class HiveTableUtil {
-
-       private HiveTableUtil() {
-       }
-
-       /**
-        * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
-        */
-       public static ObjectInspector getObjectInspector(DataType flinkType) 
throws IOException {
-               return 
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
-       }
-
-       // TODO: reuse Hive's TypeInfoUtils?
-       private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
-               switch (type.getCategory()) {
-
-                       case PRIMITIVE:
-                               PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
-                               return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
-
-                       // TODO: support complex types
-                       default:
-                               throw new IOException("Unsupported Hive type 
category " + type.getCategory());
-               }
-       }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 2068f68..78cb3bd 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -784,7 +784,6 @@ public class HiveCatalog extends AbstractCatalog {
 
        private Partition instantiateHivePartition(Table hiveTable, 
CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
                        throws PartitionSpecInvalidException {
-               Partition partition = new Partition();
                List<String> partCols = 
getFieldNames(hiveTable.getPartitionKeys());
                List<String> partValues = getOrderedFullPartitionValues(
                        partitionSpec, partCols, new 
ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()));
@@ -797,15 +796,10 @@ public class HiveCatalog extends AbstractCatalog {
                }
                // TODO: handle GenericCatalogPartition
                HiveCatalogPartition hiveCatalogPartition = 
(HiveCatalogPartition) catalogPartition;
-               partition.setValues(partValues);
-               partition.setDbName(hiveTable.getDbName());
-               partition.setTableName(hiveTable.getTableName());
-               partition.setCreateTime((int) (System.currentTimeMillis() / 
1000));
-               partition.setParameters(hiveCatalogPartition.getProperties());
-               partition.setSd(hiveTable.getSd().deepCopy());
-               
partition.getSd().setLocation(hiveCatalogPartition.getLocation());
-
-               return partition;
+               StorageDescriptor sd = hiveTable.getSd().deepCopy();
+               sd.setLocation(hiveCatalogPartition.getLocation());
+               return HiveTableUtil.createHivePartition(hiveTable.getDbName(), 
hiveTable.getTableName(), partValues,
+                               sd, hiveCatalogPartition.getProperties());
        }
 
        private static CatalogPartition instantiateCatalogPartition(Partition 
hivePartition) {
@@ -822,7 +816,7 @@ public class HiveCatalog extends AbstractCatalog {
        /**
         * Get field names from field schemas.
         */
-       private static List<String> getFieldNames(List<FieldSchema> 
fieldSchemas) {
+       public static List<String> getFieldNames(List<FieldSchema> 
fieldSchemas) {
                List<String> names = new ArrayList<>(fieldSchemas.size());
                for (FieldSchema fs : fieldSchemas) {
                        names.add(fs.getName());
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 712e709..d4ed021 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,17 +18,24 @@
 
 package org.apache.flink.table.catalog.hive.util;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -115,4 +122,42 @@ public class HiveTableUtil {
                return properties;
        }
 
+       /**
+        * Creates a Hive partition instance.
+        */
+       public static Partition createHivePartition(String dbName, String 
tableName, List<String> values,
+                       StorageDescriptor sd, Map<String, String> parameters) {
+               Partition partition = new Partition();
+               partition.setDbName(dbName);
+               partition.setTableName(tableName);
+               partition.setValues(values);
+               partition.setParameters(parameters);
+               partition.setSd(sd);
+               int currentTime = (int) (System.currentTimeMillis() / 1000);
+               partition.setCreateTime(currentTime);
+               partition.setLastAccessTime(currentTime);
+               return partition;
+       }
+
+       /**
+        * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+        */
+       public static ObjectInspector getObjectInspector(DataType flinkType) 
throws IOException {
+               return 
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
+       }
+
+       // TODO: reuse Hive's TypeInfoUtils?
+       private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
+               switch (type.getCategory()) {
+
+                       case PRIMITIVE:
+                               PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
+                               return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+
+                       // TODO: support complex types
+                       default:
+                               throw new IOException("Unsupported Hive type 
category " + type.getCategory());
+               }
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 1f9aaa7..d348d4c 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.mapred.JobConf;
@@ -87,7 +86,7 @@ public class HiveInputFormatTest {
                );
                //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
                //serDe temporarily.
-               IMetaStoreClient client = 
RetryingMetaStoreClient.getProxy(hiveConf, null, null, 
HiveMetaStoreClient.class.getName(), true);
+               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf);
                org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
                tbl.setDbName(dbName);
                tbl.setTableName(tblName);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index e6d79bd..5bf32b6 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
 import org.apache.flink.table.catalog.hive.HiveCatalogTable;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.types.Row;
@@ -43,9 +45,13 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -76,18 +82,18 @@ public class HiveTableOutputFormatTest {
        public void testInsertIntoNonPartitionTable() throws Exception {
                String dbName = "default";
                String tblName = "dest";
-               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
 
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, null, false);
                outputFormat.open(0, 1);
                List<Row> toWrite = generateRecords(5);
                writeRecords(toWrite, outputFormat);
                outputFormat.close();
                outputFormat.finalizeGlobal(1);
 
-               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite);
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite, 0);
                hiveCatalog.dropTable(tablePath, false);
        }
 
@@ -95,32 +101,83 @@ public class HiveTableOutputFormatTest {
        public void testInsertOverwrite() throws Exception {
                String dbName = "default";
                String tblName = "dest";
-               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 
                // write some data and verify
-               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, null, false);
                outputFormat.open(0, 1);
                List<Row> toWrite = generateRecords(5);
                writeRecords(toWrite, outputFormat);
                outputFormat.close();
                outputFormat.finalizeGlobal(1);
-               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite);
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite, 0);
 
                // write some data to overwrite existing data and verify
-               outputFormat = createHiveTableOutputFormat(tablePath, 
hiveTable, rowTypeInfo, true);
+               outputFormat = createHiveTableOutputFormat(tablePath, 
hiveTable, rowTypeInfo, null, true);
                outputFormat.open(0, 1);
                toWrite = generateRecords(3);
                writeRecords(toWrite, outputFormat);
                outputFormat.close();
                outputFormat.finalizeGlobal(1);
-               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite);
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite, 0);
 
                hiveCatalog.dropTable(tablePath, false);
        }
 
-       private RowTypeInfo createDestTable(String dbName, String tblName) 
throws Exception {
+       @Test
+       public void testInsertIntoStaticPartition() throws Exception {
+               String dbName = "default";
+               String tblName = "dest";
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+               ObjectPath tablePath = new ObjectPath(dbName, tblName);
+               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+               Map<String, Object> partSpec = new HashMap<>();
+               partSpec.put("s", "a");
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, partSpec, false);
+               outputFormat.open(0, 1);
+               List<Row> toWrite = generateRecords(1);
+               writeRecords(toWrite, outputFormat);
+               outputFormat.close();
+               outputFormat.finalizeGlobal(1);
+
+               // make sure new partition is created
+               assertEquals(toWrite.size(), 
hiveCatalog.listPartitions(tablePath).size());
+               HiveCatalogPartition catalogPartition = (HiveCatalogPartition) 
hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
+                               
partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()))));
+               verifyWrittenData(new Path(catalogPartition.getLocation(), 
"0"), toWrite, 1);
+
+               hiveCatalog.dropTable(tablePath, false);
+       }
+
+       @Test
+       public void testInsertIntoDynamicPartition() throws Exception {
+               String dbName = "default";
+               String tblName = "dest";
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+               ObjectPath tablePath = new ObjectPath(dbName, tblName);
+               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, 
Collections.emptyMap(), false);
+               outputFormat.open(0, 1);
+               List<Row> toWrite = generateRecords(5);
+               writeRecords(toWrite, outputFormat);
+               outputFormat.close();
+               outputFormat.finalizeGlobal(1);
+
+               List<CatalogPartitionSpec> partitionSpecs = 
hiveCatalog.listPartitions(tablePath);
+               assertEquals(toWrite.size(), partitionSpecs.size());
+               for (int i = 0; i < toWrite.size(); i++) {
+                       HiveCatalogPartition partition = (HiveCatalogPartition) 
hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
+                       verifyWrittenData(new Path(partition.getLocation(), 
"0"), Collections.singletonList(toWrite.get(i)), 1);
+               }
+
+               hiveCatalog.dropTable(tablePath, false);
+       }
+
+       private RowTypeInfo createDestTable(String dbName, String tblName, int 
numPartCols) throws Exception {
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
                TableSchema tableSchema = new TableSchema(
                                new String[]{"i", "l", "d", "s"},
@@ -130,29 +187,41 @@ public class HiveTableOutputFormatTest {
                                                BasicTypeInfo.DOUBLE_TYPE_INFO,
                                                BasicTypeInfo.STRING_TYPE_INFO}
                );
-               HiveCatalogTable catalogTable = new 
HiveCatalogTable(tableSchema, new HashMap<>(), "");
+               HiveCatalogTable catalogTable = createCatalogTable(tableSchema, 
numPartCols);
                hiveCatalog.createTable(tablePath, catalogTable, false);
                return new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
        }
 
+       private HiveCatalogTable createCatalogTable(TableSchema tableSchema, 
int numPartCols) {
+               if (numPartCols == 0) {
+                       return new HiveCatalogTable(tableSchema, new 
HashMap<>(), "");
+               }
+               String[] partCols = new String[numPartCols];
+               System.arraycopy(tableSchema.getFieldNames(), 
tableSchema.getFieldNames().length - numPartCols, partCols, 0, numPartCols);
+               return new HiveCatalogTable(tableSchema, 
Arrays.asList(partCols), new HashMap<>(), "");
+       }
+
        private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath 
tablePath, Table hiveTable,
-                       RowTypeInfo rowTypeInfo, boolean overwrite) throws 
Exception {
+                       RowTypeInfo rowTypeInfo, Map<String, Object> partSpec, 
boolean overwrite) throws Exception {
                StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
                jobSD.setLocation(hiveTable.getSd().getLocation() + 
"/.staging");
-               HiveTablePartition hiveTablePartition = new 
HiveTablePartition(jobSD, null);
+               HiveTablePartition hiveTablePartition = new 
HiveTablePartition(jobSD, partSpec);
                JobConf jobConf = new JobConf(hiveConf);
                return new HiveTableOutputFormat(jobConf, 
tablePath.getDatabaseName(), tablePath.getObjectName(),
-                               Collections.emptyList(), rowTypeInfo, 
hiveTablePartition, MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
+                               
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys()), rowTypeInfo, 
hiveTablePartition,
+                               MetaStoreUtils.getTableMetadata(hiveTable), 
overwrite);
        }
 
-       private void verifyWrittenData(Path outputFile, List<Row> expected) 
throws Exception {
+       private void verifyWrittenData(Path outputFile, List<Row> expected, int 
numPartCols) throws Exception {
                FileSystem fs = outputFile.getFileSystem(hiveConf);
                assertTrue(fs.exists(outputFile));
+               int[] fields = IntStream.range(0, expected.get(0).getArity() - 
numPartCols).toArray();
                try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(outputFile)))) {
                        int numWritten = 0;
                        String line = reader.readLine();
                        while (line != null) {
-                               
assertEquals(expected.get(numWritten++).toString(), line.replaceAll("\u0001", 
","));
+                               Row expectedRow = 
Row.project(expected.get(numWritten++), fields);
+                               assertEquals(expectedRow.toString(), 
line.replaceAll("\u0001", ","));
                                line = reader.readLine();
                        }
                        reader.close();

Reply via email to