This is an automated email from the ASF dual-hosted git repository.
ngangam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b87ded7ba42 HIVE-27173: Add method for Spark to be able to trigger DML
events (Naveen Gangam reviewed by Zhihua Deng) (#4201)
b87ded7ba42 is described below
commit b87ded7ba42094ace23e83b3ba44d4036eefd79d
Author: Naveen Gangam <[email protected]>
AuthorDate: Mon Apr 10 10:35:03 2023 -0400
HIVE-27173: Add method for Spark to be able to trigger DML events (Naveen
Gangam reviewed by Zhihua Deng) (#4201)
---
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 41 ++++++++++
.../apache/hadoop/hive/ql/metadata/TestHive.java | 88 ++++++++++++++++++++++
.../hadoop/hive/ql/metadata/TestHiveRemote.java | 3 +
3 files changed, 132 insertions(+)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 70ef5fc165b..d1321e733e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3799,6 +3799,47 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
+ /**
+ * This method helps callers trigger an INSERT event for DML queries without
having to deal with
+ * HMS objects. This takes java object types as arguments.
+ * @param dbName Name of the hive database this table belongs to.
+ * @param tblName Name of the hive table this event is for.
+ * @param partitionSpec Map containing key/values for each partition column.
Can be null if the event is for a table
+ * @param replace boolean to indicate whether the filelist is replacement of
existing files. Treated as additions otherwise
+ * @param newFiles List of file paths affected (added/replaced) by this DML
query. Can be null
+ * @throws HiveException if the table or partition does not exist or other
internal errors in fetching them
+ */
+ public void fireInsertEvent(String dbName, String tblName,
+ Map<String, String> partitionSpec, boolean replace, List<String>
newFiles)
+ throws HiveException {
+ if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
+ LOG.info("DML Events not enabled. Set " +
ConfVars.FIRE_EVENTS_FOR_DML.varname);
+ return;
+ }
+ Table table = getTable(dbName, tblName);
+ if (table != null && !table.isTemporary()) {
+ List<FileStatus> newFileStatusObject = null;
+ String parentDir = null;
+ if (newFiles != null && newFiles.size() > 0) {
+ newFileStatusObject = new ArrayList<>(newFiles.size());
+ if (partitionSpec != null && partitionSpec.size() > 0) {
+ // fetch the partition object to determine its location
+ Partition part = getPartition(table, partitionSpec, false);
+ parentDir = part.getLocation();
+ } else {
+ // fetch the table location
+ parentDir = table.getSd().getLocation();
+ }
+ for (String fileName: newFiles) {
+ FileStatus fStatus = new FileStatus();
+ fStatus.setPath(new Path(parentDir, fileName));
+ newFileStatusObject.add(fStatus);
+ }
+ }
+ fireInsertEvent(table, partitionSpec, replace, newFileStatusObject);
+ }
+ }
+
private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec,
boolean replace, List<FileStatus> newFiles)
throws HiveException {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
index 142f5cec22d..897e2e20026 100755
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.metadata;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,11 +32,13 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -74,6 +78,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.Assert;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
@@ -107,6 +112,8 @@ public class TestHive {
hiveConf.setFloat("fs.trash.checkpoint.interval", 30); //
FS_TRASH_CHECKPOINT_INTERVAL_KEY (hadoop-2)
hiveConf.setFloat("fs.trash.interval", 30); //
FS_TRASH_INTERVAL_KEY (hadoop-2)
hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+ MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.EVENT_LISTENERS,
DummyFireInsertListener.class.getName());
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST,
true);
SessionState.start(hiveConf);
try {
@@ -883,6 +890,69 @@ public class TestHive {
assertTrue(prevHiveObj != newHiveObj);
}
+ public void testFireInsertEvent() throws Throwable {
+ Hive hiveDb = Hive.getWithFastCheck(hiveConf, false);
+ String tableName = "test_fire_insert_event";
+ hiveDb.dropTable(tableName);
+ hiveDb.createTable(tableName, Lists.newArrayList("col1"), null,
TextInputFormat.class,
+ HiveIgnoreKeyTextOutputFormat.class);
+ Table table = hiveDb.getTable(tableName);
+ Path tablePath = table.getDataLocation();
+ // Create some files that "inserted"
+ FileSystem fileSystem = tablePath.getFileSystem(hiveConf);
+ fileSystem.deleteOnExit(tablePath);
+ Path insert1 = new Path(tablePath, "insert1"), insert2 = new
Path(tablePath, "insert2"),
+ insert3 = new Path(tablePath, "insert3");
+
+ try (OutputStream os1 = fileSystem.create(insert1);
+ OutputStream os2 = fileSystem.create(insert2);
+ OutputStream os3 = fileSystem.create(insert3)) {
+ os1.write(new StringBuilder("hello, ").append(System.lineSeparator())
+ .append("world1").toString().getBytes());
+ os2.write(new StringBuilder("hello, ").append(System.lineSeparator())
+ .append("world2").toString().getBytes());
+ os3.write(new StringBuilder("hello, ").append(System.lineSeparator())
+ .append("world3").toString().getBytes());
+ }
+
+ // Fire the InsertData event
+ hiveDb.fireInsertEvent(hiveDb.getDatabaseCurrent().getName(), tableName,
null,true,
+ Arrays.asList(insert1.toString(), insert2.toString(),
insert3.toString()));
+ // Get the last Metastore event
+ InsertEvent insertEvent = DummyFireInsertListener.getLastEvent();
+ // Check the event
+ Assert.assertNotNull(insertEvent);
+ Assert.assertNotNull(insertEvent.getTableObj());
+ Assert.assertEquals(tableName, insertEvent.getTableObj().getTableName());
+ Assert.assertEquals(hiveDb.getDatabaseCurrent().getName(),
insertEvent.getTableObj().getDbName());
+ Set<String> insertFiles = new HashSet<>(insertEvent.getFiles());
+ Set<String> expectedFiles = Sets.newHashSet(insert1.toString(),
insert2.toString(), insert3.toString());
+ Assert.assertTrue(insertFiles.size() == 3);
+ for (String insertFile : insertFiles) {
+ Assert.assertTrue(expectedFiles.contains(insertFile));
+ }
+ Map<String, String> expectedCheckSums = new HashMap<>();
+ expectedCheckSums.put("insert1", getFileCheckSum(fileSystem, insert1));
+ expectedCheckSums.put("insert2", getFileCheckSum(fileSystem, insert2));
+ expectedCheckSums.put("insert3", getFileCheckSum(fileSystem, insert3));
+ List<String> checkSums = insertEvent.getFileChecksums();
+ Assert.assertTrue(checkSums.size() == 3);
+ for (int i = 0; i < 3; i++) {
+ Path insertedPath = new Path(insertEvent.getFiles().get(i));
+ Assert.assertEquals(expectedCheckSums.get(insertedPath.getName()),
checkSums.get(i));
+ }
+ }
+
+ private String getFileCheckSum(FileSystem fileSystem, Path p) throws
Exception {
+ FileChecksum cksum = fileSystem.getFileChecksum(p);
+ if (cksum != null) {
+ String checksumString =
+ StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength());
+ return checksumString;
+ }
+ return "";
+ }
+
// shamelessly copied from Path in hadoop-2
private static final String SEPARATOR = "/";
private static final char SEPARATOR_CHAR = '/';
@@ -917,4 +987,22 @@ public class TestHive {
private static boolean hasWindowsDrive(String path) {
return (WINDOWS && hasDriveLetterSpecifier.matcher(path).find());
}
+
+ public static class DummyFireInsertListener extends MetaStoreEventListener {
+ private static final List<InsertEvent> notifyList = new ArrayList<>();
+ public DummyFireInsertListener(org.apache.hadoop.conf.Configuration conf) {
+ super(conf);
+ }
+ @Override
+ public void onInsert(InsertEvent insertEvent) throws MetaException {
+ notifyList.add(insertEvent);
+ }
+ public static InsertEvent getLastEvent() {
+ if (notifyList.isEmpty()) {
+ return null;
+ } else {
+ return notifyList.get(notifyList.size() - 1);
+ }
+ }
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
index 5a695e94199..78ec1596944 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
@@ -23,6 +23,7 @@ import java.net.ServerSocket;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
@@ -45,6 +46,8 @@ public class TestHiveRemote extends TestHive {
hiveConf = new HiveConf(TestHiveRemote.class);
hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+ MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.EVENT_LISTENERS,
DummyFireInsertListener.class.getName());
MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf);
}