This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6d910aa HIVE-24127:Dump events from default catalog only (Aasha
Medhi, reviewed by Pravin Kumar Sinha)
6d910aa is described below
commit 6d910aa43d35e89f4a5c4b861f0ab3d94a641ca4
Author: Anishek Agarwal <[email protected]>
AuthorDate: Mon Sep 14 11:27:44 2020 +0530
HIVE-24127:Dump events from default catalog only (Aasha Medhi, reviewed by
Pravin Kumar Sinha)
---
.../TestReplicationScenariosExternalTables.java | 83 +++++++++++++++++++++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 3 +
.../messaging/event/filters/CatalogFilter.java | 39 ++++++++++
3 files changed, 124 insertions(+), 1 deletion(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 182c436..f4ef716 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -27,6 +27,12 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
@@ -35,10 +41,13 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
@@ -1218,6 +1227,78 @@ public class TestReplicationScenariosExternalTables
extends BaseReplicationAcros
.verifyReplTargetProperty(replicatedDbName);
}
+ @Test
+ public void differentCatalogIncrementalReplication() throws Throwable {
+ //Create the catalog
+ Catalog catalog = new Catalog();
+ catalog.setName("spark");
+ Warehouse wh = new Warehouse(conf);
+ catalog.setLocationUri(wh.getWhRootExternal().toString() + File.separator
+ catalog);
+ catalog.setDescription("Non-hive catalog");
+ Hive.get(primary.hiveConf).getMSC().createCatalog(catalog);
+
+ //Create database and table in spark catalog
+ String sparkDbName = "src_spark";
+ Database sparkdb = new Database();
+ sparkdb.setCatalogName("spark");
+ sparkdb.setName(sparkDbName);
+ Hive.get(primary.hiveConf).getMSC().createDatabase(sparkdb);
+
+ SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe",
LazyBinaryColumnarSerDe.class.getCanonicalName(),
+ new HashMap<String, String>());
+ ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(1);
+ cols.add(new FieldSchema("place", serdeConstants.STRING_TYPE_NAME, ""));
+ StorageDescriptor sd
+ = new StorageDescriptor(cols, null,
+ "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+ "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+ false, 0, serdeInfo, null, null, null);
+ Map<String, String> tableParameters = new HashMap<String, String>();
+
+ Table sparkTable = new Table("mgt1", sparkDbName, "", 0, 0, 0,
+ sd, null, tableParameters, "", "", "");
+ sparkTable.setCatName("spark");
+ Hive.get(primary.hiveConf).getMSC().createTable(sparkTable);
+
+ //create same db in hive catalog
+ Map<String, String> params = new HashMap<>();
+ params.put(SOURCE_OF_REPLICATION, "1");
+ Database hiveDb = new Database();
+ hiveDb.setCatalogName("hive");
+ hiveDb.setName(sparkDbName);
+ hiveDb.setParameters(params);
+ Hive.get(primary.hiveConf).getMSC().createDatabase(hiveDb);
+
+ primary.dump(sparkDbName);
+ //spark tables are not replicated in bootstrap
+ replica.load(replicatedDbName, sparkDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables like mgdt1")
+ .verifyResult(null);
+
+ Path externalTableLocation =
+ new Path("/" + testName.getMethodName() + "/t1/");
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+ //Create another table in spark
+ sparkTable = new Table("mgt2", sparkDbName, "", 0, 0, 0,
+ sd, null, tableParameters, "", "", "");
+ sparkTable.setCatName("spark");
+ Hive.get(primary.hiveConf).getMSC().createTable(sparkTable);
+
+ //Incremental load shouldn't copy any events from spark catalog
+ primary.dump(sparkDbName);
+ replica.load(replicatedDbName, sparkDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables like mgdt1")
+ .verifyResult(null)
+ .run("show tables like 'mgt2'")
+ .verifyResult(null);
+
+ primary.run("drop database if exists " + sparkDbName + " cascade");
+ }
+
private void assertExternalFileInfo(List<String> expected, String
dumplocation,
boolean isIncremental) throws
IOException {
assertExternalFileInfo(expected, dumplocation, null, isIncremental);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index c897bb4..21dafb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
import
org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import
org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -536,6 +538,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId);
IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
new ReplEventFilter(work.replScope),
+ new CatalogFilter(MetaStoreUtils.getDefaultCatalog(conf)),
new EventBoundaryFilter(work.eventFrom, work.eventTo));
EventUtils.MSClientNotificationFetcher evFetcher
= new EventUtils.MSClientNotificationFetcher(hiveDb);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java
new file mode 100644
index 0000000..cca1125
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+/**
+ * Utility function that constructs a notification filter to match a given
catalog name.
+ */
+public class CatalogFilter extends BasicFilter {
+ private final String catalogName;
+
+ public CatalogFilter(final String catalogName) {
+ this.catalogName = catalogName;
+ }
+
+ @Override
+ boolean shouldAccept(final NotificationEvent event) {
+ if (catalogName == null || event.getCatName() == null ||
catalogName.equalsIgnoreCase(event.getCatName())) {
+ return true;
+ }
+ return false;
+ }
+}