This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6d910aa HIVE-24127:Dump events from default catalog only (Aasha Medhi, reviewed by Pravin Kumar Sinha) 6d910aa is described below commit 6d910aa43d35e89f4a5c4b861f0ab3d94a641ca4 Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Mon Sep 14 11:27:44 2020 +0530 HIVE-24127:Dump events from default catalog only (Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../TestReplicationScenariosExternalTables.java | 83 +++++++++++++++++++++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 3 + .../messaging/event/filters/CatalogFilter.java | 39 ++++++++++ 3 files changed, 124 insertions(+), 1 deletion(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 182c436..f4ef716 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -27,6 +27,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; @@ -35,10 +41,13 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Assert; @@ -1218,6 +1227,78 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyReplTargetProperty(replicatedDbName); } + @Test + public void differentCatalogIncrementalReplication() throws Throwable { + //Create the catalog + Catalog catalog = new Catalog(); + catalog.setName("spark"); + Warehouse wh = new Warehouse(conf); + catalog.setLocationUri(wh.getWhRootExternal().toString() + File.separator + catalog); + catalog.setDescription("Non-hive catalog"); + Hive.get(primary.hiveConf).getMSC().createCatalog(catalog); + + //Create database and table in spark catalog + String sparkDbName = "src_spark"; + Database sparkdb = new Database(); + sparkdb.setCatalogName("spark"); + sparkdb.setName(sparkDbName); + Hive.get(primary.hiveConf).getMSC().createDatabase(sparkdb); + + SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), + new HashMap<String, String>()); + ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(1); + cols.add(new FieldSchema("place", serdeConstants.STRING_TYPE_NAME, "")); + StorageDescriptor sd + = new StorageDescriptor(cols, null, + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + false, 0, serdeInfo, null, null, null); + Map<String, String> tableParameters = new HashMap<String, String>(); + + Table sparkTable = new Table("mgt1", sparkDbName, "", 0, 0, 0, + sd, null, tableParameters, "", "", ""); + sparkTable.setCatName("spark"); + Hive.get(primary.hiveConf).getMSC().createTable(sparkTable); + + //create same db in hive catalog + Map<String, String> params = new HashMap<>(); + params.put(SOURCE_OF_REPLICATION, "1"); + Database hiveDb = new Database(); + hiveDb.setCatalogName("hive"); + hiveDb.setName(sparkDbName); + hiveDb.setParameters(params); + Hive.get(primary.hiveConf).getMSC().createDatabase(hiveDb); + + primary.dump(sparkDbName); + //spark tables are not replicated in bootstrap + replica.load(replicatedDbName, sparkDbName) + .run("use " + replicatedDbName) + .run("show tables like mgdt1") + .verifyResult(null); + + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + //Create another table in spark + sparkTable = new Table("mgt2", sparkDbName, "", 0, 0, 0, + sd, null, tableParameters, "", "", ""); + sparkTable.setCatName("spark"); + Hive.get(primary.hiveConf).getMSC().createTable(sparkTable); + + //Incremental load shouldn't copy any events from spark catalog + primary.dump(sparkDbName); + replica.load(replicatedDbName, sparkDbName) + .run("use " + replicatedDbName) + .run("show tables like mgdt1") + .verifyResult(null) + .run("show tables like 'mgt2'") + .verifyResult(null); + + primary.run("drop database if exists " + sparkDbName + " cascade"); + } + private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental) throws IOException { assertExternalFileInfo(expected, dumplocation, null, isIncremental); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index c897bb4..21dafb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,8 +41,10 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -536,6 +538,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new ReplEventFilter(work.replScope), + new CatalogFilter(MetaStoreUtils.getDefaultCatalog(conf)), new EventBoundaryFilter(work.eventFrom, work.eventTo)); EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java new file mode 100644 index 0000000..cca1125 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Utility function that constructs a notification filter to match a given catalog name. + */ +public class CatalogFilter extends BasicFilter { + private final String catalogName; + + public CatalogFilter(final String catalogName) { + this.catalogName = catalogName; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + if (catalogName == null || event.getCatName() == null || catalogName.equalsIgnoreCase(event.getCatName())) { + return true; + } + return false; + } +}