This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c9a5af3  [FLINK-12715][hive] fix Hive-1.2.1 build
c9a5af3 is described below

commit c9a5af37dc329a336a22b316a5a18c01dc8f5009
Author: Rui Li <[email protected]>
AuthorDate: Tue Jun 4 15:51:12 2019 +0800

    [FLINK-12715][hive] fix Hive-1.2.1 build
    
    This PR fixes hive-1.2.1 build. The build was broken because some util 
method used in FLINK-12568 is not compatible in Hive-2.3.4 and Hive-1.2.1.
    
    This closes #8610.
---
 .../connectors/hive/HiveTableOutputFormat.java     |  5 +-
 .../flink/table/catalog/hive/client/HiveShim.java  | 23 +++++-
 .../table/catalog/hive/client/HiveShimV1.java      | 16 ++++
 .../table/catalog/hive/client/HiveShimV2.java      | 16 ++++
 .../connectors/hive/HiveTableOutputFormatTest.java | 85 +++++++++++++++++-----
 5 files changed, 123 insertions(+), 22 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index 324c6c6..dbfbf5a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -28,6 +28,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -286,8 +288,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                                // Note we assume the srcDir is a hidden dir, 
otherwise it will be deleted if it's a sub-dir of destDir
                                FileStatus[] existingFiles = 
fs.listStatus(destDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
                                if (existingFiles != null) {
+                                       HiveShim hiveShim = 
HiveShimLoader.loadHiveShim();
                                        for (FileStatus existingFile : 
existingFiles) {
-                                               
Preconditions.checkState(FileUtils.moveToTrash(fs, existingFile.getPath(), 
jobConf, purge),
+                                               
Preconditions.checkState(hiveShim.moveToTrash(fs, existingFile.getPath(), 
jobConf, purge),
                                                        "Failed to overwrite 
existing file " + existingFile);
                                        }
                                }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 2e1c195..b0eab75 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.catalog.hive.client;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -25,10 +28,11 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
- * A shim layer to support different versions of HMS.
+ * A shim layer to support different versions of Hive.
  */
 public interface HiveShim {
 
@@ -62,4 +66,21 @@ public interface HiveShim {
         * @throws TException            for any other generic exceptions 
caused by Thrift
         */
        Function getFunction(IMetaStoreClient client, String dbName, String 
functionName) throws NoSuchObjectException, TException;
+
+       /**
+        * Moves a particular file or directory to trash.
+        * The file/directory can potentially be deleted (w/o going to trash) 
if purge is set to true, or if it cannot
+        * be moved properly.
+        *
+        * <p>This interface is here because FileUtils.moveToTrash in different 
Hive versions have different signatures.
+        *
+        * @param fs    the FileSystem to use
+        * @param path  the path of the file or directory to be moved to trash.
+        * @param conf  the Configuration to use
+        * @param purge whether try to skip trash and directly delete the 
file/directory. This flag may be ignored by
+        *              old Hive versions prior to 2.3.0.
+        * @return true if the move is successful, and false otherwise
+        * @throws IOException if the file/directory cannot be properly moved 
or deleted
+        */
+       boolean moveToTrash(FileSystem fs, Path path, Configuration conf, 
boolean purge) throws IOException;
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
index d22ff39..f28fc5c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -30,6 +34,8 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
@@ -79,4 +85,14 @@ public class HiveShimV1 implements HiveShim {
                        throw e;
                }
        }
+
+       @Override
+       public boolean moveToTrash(FileSystem fs, Path path, Configuration 
conf, boolean purge) throws IOException {
+               try {
+                       Method method = 
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class, 
Configuration.class);
+                       return (boolean) method.invoke(null, fs, path, conf);
+               } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+                       throw new IOException("Failed to move " + path + " to 
trash", e);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
index 5fbdd9a..58bb460 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -29,6 +33,7 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -70,4 +75,15 @@ public class HiveShimV2 implements HiveShim {
        public Function getFunction(IMetaStoreClient client, String dbName, 
String functionName) throws NoSuchObjectException, TException {
                return client.getFunction(dbName, functionName);
        }
+
+       @Override
+       public boolean moveToTrash(FileSystem fs, Path path, Configuration 
conf, boolean purge) throws IOException {
+               try {
+                       Method method = 
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
+                                       Configuration.class, boolean.class);
+                       return (boolean) method.invoke(null, fs, path, conf, 
purge);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new IOException("Failed to move " + path + " to 
trash", e);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index ff44364..5754d59 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -74,8 +74,53 @@ public class HiveTableOutputFormatTest {
 
        @Test
        public void testInsertIntoNonPartitionTable() throws Exception {
-               final String dbName = "default";
-               final String tblName = "dest";
+               String dbName = "default";
+               String tblName = "dest";
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+               ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+               outputFormat.open(0, 1);
+               List<Row> toWrite = generateRecords(5);
+               writeRecords(toWrite, outputFormat);
+               outputFormat.close();
+               outputFormat.finalizeGlobal(1);
+
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite);
+               hiveCatalog.dropTable(tablePath, false);
+       }
+
+       @Test
+       public void testInsertOverwrite() throws Exception {
+               String dbName = "default";
+               String tblName = "dest";
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+               ObjectPath tablePath = new ObjectPath(dbName, tblName);
+               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+               // write some data and verify
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+               outputFormat.open(0, 1);
+               List<Row> toWrite = generateRecords(5);
+               writeRecords(toWrite, outputFormat);
+               outputFormat.close();
+               outputFormat.finalizeGlobal(1);
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite);
+
+               // write some data to overwrite existing data and verify
+               outputFormat = createHiveTableOutputFormat(tablePath, 
hiveTable, rowTypeInfo, true);
+               outputFormat.open(0, 1);
+               toWrite = generateRecords(3);
+               writeRecords(toWrite, outputFormat);
+               outputFormat.close();
+               outputFormat.finalizeGlobal(1);
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite);
+
+               hiveCatalog.dropTable(tablePath, false);
+       }
+
+       private RowTypeInfo createDestTable(String dbName, String tblName) 
throws Exception {
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
                TableSchema tableSchema = new TableSchema(
                                new String[]{"i", "l", "d", "s"},
@@ -87,41 +132,41 @@ public class HiveTableOutputFormatTest {
                );
                HiveCatalogTable catalogTable = new 
HiveCatalogTable(tableSchema, new HashMap<>(), "");
                hiveCatalog.createTable(tablePath, catalogTable, false);
+               return new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
+       }
 
-               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-               RowTypeInfo rowTypeInfo = new 
RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+       private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath 
tablePath, Table hiveTable,
+                       RowTypeInfo rowTypeInfo, boolean overwrite) throws 
Exception {
                StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
                jobSD.setLocation(hiveTable.getSd().getLocation() + 
"/.staging");
                HiveTablePartition hiveTablePartition = new 
HiveTablePartition(jobSD, null);
                JobConf jobConf = new JobConf(hiveConf);
-               HiveTableOutputFormat outputFormat = new 
HiveTableOutputFormat(jobConf, dbName, tblName,
-                               Collections.emptyList(), rowTypeInfo, 
hiveTablePartition, MetaStoreUtils.getTableMetadata(hiveTable), false);
-               outputFormat.open(0, 1);
-               List<Row> toWrite = generateRecords();
-               for (Row row : toWrite) {
-                       outputFormat.writeRecord(row);
-               }
-               outputFormat.close();
-               outputFormat.finalizeGlobal(1);
+               return new HiveTableOutputFormat(jobConf, 
tablePath.getDatabaseName(), tablePath.getObjectName(),
+                               Collections.emptyList(), rowTypeInfo, 
hiveTablePartition, MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
+       }
 
-               // verify written data
-               Path outputFile = new Path(hiveTable.getSd().getLocation(), 
"0");
-               FileSystem fs = outputFile.getFileSystem(jobConf);
+       private void verifyWrittenData(Path outputFile, List<Row> expected) 
throws Exception {
+               FileSystem fs = outputFile.getFileSystem(hiveConf);
                assertTrue(fs.exists(outputFile));
                try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(outputFile)))) {
                        int numWritten = 0;
                        String line = reader.readLine();
                        while (line != null) {
-                               
assertEquals(toWrite.get(numWritten++).toString(), line.replaceAll("\u0001", 
","));
+                               
assertEquals(expected.get(numWritten++).toString(), line.replaceAll("\u0001", 
","));
                                line = reader.readLine();
                        }
                        reader.close();
-                       assertEquals(toWrite.size(), numWritten);
+                       assertEquals(expected.size(), numWritten);
+               }
+       }
+
+       private void writeRecords(List<Row> toWrite, HiveTableOutputFormat 
outputFormat) throws IOException {
+               for (Row row : toWrite) {
+                       outputFormat.writeRecord(row);
                }
        }
 
-       private List<Row> generateRecords() {
-               int numRecords = 5;
+       private List<Row> generateRecords(int numRecords) {
                int arity = 4;
                List<Row> res = new ArrayList<>(numRecords);
                for (int i = 0; i < numRecords; i++) {

Reply via email to