This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d910aa  HIVE-24127:Dump events from default catalog only (Aasha 
Medhi, reviewed by Pravin Kumar Sinha)
6d910aa is described below

commit 6d910aa43d35e89f4a5c4b861f0ab3d94a641ca4
Author: Anishek Agarwal <anis...@gmail.com>
AuthorDate: Mon Sep 14 11:27:44 2020 +0530

    HIVE-24127:Dump events from default catalog only (Aasha Medhi, reviewed by 
Pravin Kumar Sinha)
---
 .../TestReplicationScenariosExternalTables.java    | 83 +++++++++++++++++++++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  3 +
 .../messaging/event/filters/CatalogFilter.java     | 39 ++++++++++
 3 files changed, 124 insertions(+), 1 deletion(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 182c436..f4ef716 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -27,6 +27,12 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
 import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
 import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
@@ -35,10 +41,13 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Assert;
@@ -1218,6 +1227,78 @@ public class TestReplicationScenariosExternalTables 
extends BaseReplicationAcros
             .verifyReplTargetProperty(replicatedDbName);
   }
 
+  @Test
+  public void differentCatalogIncrementalReplication() throws Throwable {
+    //Create the catalog
+    Catalog catalog = new Catalog();
+    catalog.setName("spark");
+    Warehouse wh = new Warehouse(conf);
+    catalog.setLocationUri(wh.getWhRootExternal().toString() + File.separator 
+ catalog);
+    catalog.setDescription("Non-hive catalog");
+    Hive.get(primary.hiveConf).getMSC().createCatalog(catalog);
+
+    //Create database and table in spark catalog
+    String sparkDbName = "src_spark";
+    Database sparkdb = new Database();
+    sparkdb.setCatalogName("spark");
+    sparkdb.setName(sparkDbName);
+    Hive.get(primary.hiveConf).getMSC().createDatabase(sparkdb);
+
+    SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", 
LazyBinaryColumnarSerDe.class.getCanonicalName(),
+      new HashMap<String, String>());
+    ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(1);
+    cols.add(new FieldSchema("place", serdeConstants.STRING_TYPE_NAME, ""));
+    StorageDescriptor sd
+      = new StorageDescriptor(cols, null,
+      "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+      "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+      false, 0, serdeInfo, null, null, null);
+    Map<String, String> tableParameters = new HashMap<String, String>();
+
+    Table sparkTable = new Table("mgt1", sparkDbName, "", 0, 0, 0,
+      sd, null, tableParameters, "", "", "");
+    sparkTable.setCatName("spark");
+    Hive.get(primary.hiveConf).getMSC().createTable(sparkTable);
+
+    //create same db in hive catalog
+    Map<String, String> params = new HashMap<>();
+    params.put(SOURCE_OF_REPLICATION, "1");
+    Database hiveDb = new Database();
+    hiveDb.setCatalogName("hive");
+    hiveDb.setName(sparkDbName);
+    hiveDb.setParameters(params);
+    Hive.get(primary.hiveConf).getMSC().createDatabase(hiveDb);
+
+    primary.dump(sparkDbName);
+    //spark tables are not replicated in bootstrap
+    replica.load(replicatedDbName, sparkDbName)
+      .run("use " + replicatedDbName)
+      .run("show tables like mgdt1")
+      .verifyResult(null);
+
+    Path externalTableLocation =
+      new Path("/" + testName.getMethodName() + "/t1/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    //Create another table in spark
+    sparkTable = new Table("mgt2", sparkDbName, "", 0, 0, 0,
+      sd, null, tableParameters, "", "", "");
+    sparkTable.setCatName("spark");
+    Hive.get(primary.hiveConf).getMSC().createTable(sparkTable);
+
+    //Incremental load shouldn't copy any events from spark catalog
+    primary.dump(sparkDbName);
+    replica.load(replicatedDbName, sparkDbName)
+      .run("use " + replicatedDbName)
+      .run("show tables like mgdt1")
+      .verifyResult(null)
+      .run("show tables like 'mgt2'")
+      .verifyResult(null);
+
+    primary.run("drop database if exists " + sparkDbName + " cascade");
+  }
+
   private void assertExternalFileInfo(List<String> expected, String 
dumplocation,
                                       boolean isIncremental) throws 
IOException {
     assertExternalFileInfo(expected, dumplocation, null, isIncremental);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index c897bb4..21dafb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
 import 
org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import 
org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -536,6 +538,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId);
     IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
         new ReplEventFilter(work.replScope),
+        new CatalogFilter(MetaStoreUtils.getDefaultCatalog(conf)),
         new EventBoundaryFilter(work.eventFrom, work.eventTo));
     EventUtils.MSClientNotificationFetcher evFetcher
         = new EventUtils.MSClientNotificationFetcher(hiveDb);
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java
new file mode 100644
index 0000000..cca1125
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+/**
+ * Utility function that constructs a notification filter to match a given 
catalog name.
+ */
+public class CatalogFilter extends BasicFilter {
+  private final String catalogName;
+
+  public CatalogFilter(final String catalogName) {
+    this.catalogName = catalogName;
+  }
+
+  @Override
+  boolean shouldAccept(final NotificationEvent event) {
+    if (catalogName == null || event.getCatName() == null || 
catalogName.equalsIgnoreCase(event.getCatName())) {
+      return true;
+    }
+    return false;
+  }
+}

Reply via email to