This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c9a5af3 [FLINK-12715][hive] fix Hive-1.2.1 build
c9a5af3 is described below
commit c9a5af37dc329a336a22b316a5a18c01dc8f5009
Author: Rui Li <[email protected]>
AuthorDate: Tue Jun 4 15:51:12 2019 +0800
[FLINK-12715][hive] fix Hive-1.2.1 build
This PR fixes hive-1.2.1 build. The build was broken because some util
method used in FLINK-12568 is not compatible in Hive-2.3.4 and Hive-1.2.1.
This closes #8610.
---
.../connectors/hive/HiveTableOutputFormat.java | 5 +-
.../flink/table/catalog/hive/client/HiveShim.java | 23 +++++-
.../table/catalog/hive/client/HiveShimV1.java | 16 ++++
.../table/catalog/hive/client/HiveShimV2.java | 16 ++++
.../connectors/hive/HiveTableOutputFormatTest.java | 85 +++++++++++++++++-----
5 files changed, 123 insertions(+), 22 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index 324c6c6..dbfbf5a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -28,6 +28,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -286,8 +288,9 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
// Note we assume the srcDir is a hidden dir,
otherwise it will be deleted if it's a sub-dir of destDir
FileStatus[] existingFiles =
fs.listStatus(destDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (existingFiles != null) {
+ HiveShim hiveShim =
HiveShimLoader.loadHiveShim();
for (FileStatus existingFile :
existingFiles) {
-
Preconditions.checkState(FileUtils.moveToTrash(fs, existingFile.getPath(),
jobConf, purge),
+
Preconditions.checkState(hiveShim.moveToTrash(fs, existingFile.getPath(),
jobConf, purge),
"Failed to overwrite
existing file " + existingFile);
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 2e1c195..b0eab75 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -18,6 +18,9 @@
package org.apache.flink.table.catalog.hive.client;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Function;
@@ -25,10 +28,11 @@ import
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
+import java.io.IOException;
import java.util.List;
/**
- * A shim layer to support different versions of HMS.
+ * A shim layer to support different versions of Hive.
*/
public interface HiveShim {
@@ -62,4 +66,21 @@ public interface HiveShim {
* @throws TException for any other generic exceptions
caused by Thrift
*/
Function getFunction(IMetaStoreClient client, String dbName, String
functionName) throws NoSuchObjectException, TException;
+
+ /**
+ * Moves a particular file or directory to trash.
+ * The file/directory can potentially be deleted (w/o going to trash)
if purge is set to true, or if it cannot
+ * be moved properly.
+ *
+ * <p>This interface is here because FileUtils.moveToTrash in different
Hive versions have different signatures.
+ *
+ * @param fs the FileSystem to use
+ * @param path the path of the file or directory to be moved to trash.
+ * @param conf the Configuration to use
+ * @param purge whether try to skip trash and directly delete the
file/directory. This flag may be ignored by
+ * old Hive versions prior to 2.3.0.
+ * @return true if the move is successful, and false otherwise
+ * @throws IOException if the file/directory cannot be properly moved
or deleted
+ */
+ boolean moveToTrash(FileSystem fs, Path path, Configuration conf,
boolean purge) throws IOException;
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
index d22ff39..f28fc5c 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -30,6 +34,8 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@@ -79,4 +85,14 @@ public class HiveShimV1 implements HiveShim {
throw e;
}
}
+
+ @Override
+ public boolean moveToTrash(FileSystem fs, Path path, Configuration
conf, boolean purge) throws IOException {
+ try {
+ Method method =
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
Configuration.class);
+ return (boolean) method.invoke(null, fs, path, conf);
+ } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
+ throw new IOException("Failed to move " + path + " to
trash", e);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
index 5fbdd9a..58bb460 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -29,6 +33,7 @@ import
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
@@ -70,4 +75,15 @@ public class HiveShimV2 implements HiveShim {
public Function getFunction(IMetaStoreClient client, String dbName,
String functionName) throws NoSuchObjectException, TException {
return client.getFunction(dbName, functionName);
}
+
+ @Override
+ public boolean moveToTrash(FileSystem fs, Path path, Configuration
conf, boolean purge) throws IOException {
+ try {
+ Method method =
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
+ Configuration.class, boolean.class);
+ return (boolean) method.invoke(null, fs, path, conf,
purge);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ throw new IOException("Failed to move " + path + " to
trash", e);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index ff44364..5754d59 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -74,8 +74,53 @@ public class HiveTableOutputFormatTest {
@Test
public void testInsertIntoNonPartitionTable() throws Exception {
- final String dbName = "default";
- final String tblName = "dest";
+ String dbName = "default";
+ String tblName = "dest";
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+ outputFormat.open(0, 1);
+ List<Row> toWrite = generateRecords(5);
+ writeRecords(toWrite, outputFormat);
+ outputFormat.close();
+ outputFormat.finalizeGlobal(1);
+
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite);
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
+ @Test
+ public void testInsertOverwrite() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+ // write some data and verify
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+ outputFormat.open(0, 1);
+ List<Row> toWrite = generateRecords(5);
+ writeRecords(toWrite, outputFormat);
+ outputFormat.close();
+ outputFormat.finalizeGlobal(1);
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite);
+
+ // write some data to overwrite existing data and verify
+ outputFormat = createHiveTableOutputFormat(tablePath,
hiveTable, rowTypeInfo, true);
+ outputFormat.open(0, 1);
+ toWrite = generateRecords(3);
+ writeRecords(toWrite, outputFormat);
+ outputFormat.close();
+ outputFormat.finalizeGlobal(1);
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite);
+
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
+ private RowTypeInfo createDestTable(String dbName, String tblName)
throws Exception {
ObjectPath tablePath = new ObjectPath(dbName, tblName);
TableSchema tableSchema = new TableSchema(
new String[]{"i", "l", "d", "s"},
@@ -87,41 +132,41 @@ public class HiveTableOutputFormatTest {
);
HiveCatalogTable catalogTable = new
HiveCatalogTable(tableSchema, new HashMap<>(), "");
hiveCatalog.createTable(tablePath, catalogTable, false);
+ return new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
+ }
- Table hiveTable = hiveCatalog.getHiveTable(tablePath);
- RowTypeInfo rowTypeInfo = new
RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+ private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath
tablePath, Table hiveTable,
+ RowTypeInfo rowTypeInfo, boolean overwrite) throws
Exception {
StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
jobSD.setLocation(hiveTable.getSd().getLocation() +
"/.staging");
HiveTablePartition hiveTablePartition = new
HiveTablePartition(jobSD, null);
JobConf jobConf = new JobConf(hiveConf);
- HiveTableOutputFormat outputFormat = new
HiveTableOutputFormat(jobConf, dbName, tblName,
- Collections.emptyList(), rowTypeInfo,
hiveTablePartition, MetaStoreUtils.getTableMetadata(hiveTable), false);
- outputFormat.open(0, 1);
- List<Row> toWrite = generateRecords();
- for (Row row : toWrite) {
- outputFormat.writeRecord(row);
- }
- outputFormat.close();
- outputFormat.finalizeGlobal(1);
+ return new HiveTableOutputFormat(jobConf,
tablePath.getDatabaseName(), tablePath.getObjectName(),
+ Collections.emptyList(), rowTypeInfo,
hiveTablePartition, MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
+ }
- // verify written data
- Path outputFile = new Path(hiveTable.getSd().getLocation(),
"0");
- FileSystem fs = outputFile.getFileSystem(jobConf);
+ private void verifyWrittenData(Path outputFile, List<Row> expected)
throws Exception {
+ FileSystem fs = outputFile.getFileSystem(hiveConf);
assertTrue(fs.exists(outputFile));
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(fs.open(outputFile)))) {
int numWritten = 0;
String line = reader.readLine();
while (line != null) {
-
assertEquals(toWrite.get(numWritten++).toString(), line.replaceAll("\u0001",
","));
+
assertEquals(expected.get(numWritten++).toString(), line.replaceAll("\u0001",
","));
line = reader.readLine();
}
reader.close();
- assertEquals(toWrite.size(), numWritten);
+ assertEquals(expected.size(), numWritten);
+ }
+ }
+
+ private void writeRecords(List<Row> toWrite, HiveTableOutputFormat
outputFormat) throws IOException {
+ for (Row row : toWrite) {
+ outputFormat.writeRecord(row);
}
}
- private List<Row> generateRecords() {
- int numRecords = 5;
+ private List<Row> generateRecords(int numRecords) {
int arity = 4;
List<Row> res = new ArrayList<>(numRecords);
for (int i = 0; i < numRecords; i++) {