This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d9ae535  [GOBBLIN-1396] Enable HiveWriter to consume gmce and register 
into hive MetadataStore
d9ae535 is described below

commit d9ae5353c74fdcd385835fca9b586b3fdb90971b
Author: Zihan Li <[email protected]>
AuthorDate: Tue Mar 23 15:21:10 2021 -0700

    [GOBBLIN-1396] Enable HiveWriter to consume gmce and register into hive 
MetadataStore
    
    [GOBBLIN-1396]Enable HiveWriter to consume gmce
    and register into hive MetadataStore
    
    bug fix
    
    add default branch to pass styleCheck
    
    bug fix to avoid dead lock
    
    address comments
    
    address comments
    
    enable rewrite/drop file in HiveMetadataWriter
    
    remove unintentional change
    
    address comments
    
    force to set schema literal
    
    Closes #3234 from ZihanLi58/GOBBLIN-1396
---
 .../org/apache/gobblin/writer/FsDataWriter.java    |   8 +-
 .../java/org/apache/gobblin/hive/HiveRegister.java |   4 +-
 .../hive/metastore/HiveMetaStoreBasedRegister.java |   3 +-
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 290 +++++++++++++++++++++
 .../gobblin/hive}/writer/MetadataWriter.java       |   5 +-
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   |  15 +-
 .../iceberg/writer/IcebergMetadataWriter.java      |   6 +-
 ...WriterTest.java => HiveMetadataWriterTest.java} | 123 +++++----
 ...terTest.java => IcebergMetadataWriterTest.java} |  43 +--
 9 files changed, 406 insertions(+), 91 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index d8060ef..7670b5b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -101,8 +101,8 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
     JobConfigurationUtils.putStateIntoConfiguration(properties, conf);
     this.fs = WriterUtils.getWriterFS(properties, this.numBranches, 
this.branchId);
     this.fileContext = FileContext.getFileContext(
-            WriterUtils.getWriterFsUri(properties, this.numBranches, 
this.branchId),
-            conf);
+        WriterUtils.getWriterFsUri(properties, this.numBranches, 
this.branchId),
+        conf);
 
     // Initialize staging/output directory
     Path writerStagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils
@@ -131,7 +131,7 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
         ConfigurationKeys.DEFAULT_BUFFER_SIZE);
 
     this.replicationFactor = properties.getPropAsShort(ForkOperatorUtils
-        
.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR, 
this.numBranches, this.branchId),
+            
.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR, 
this.numBranches, this.branchId),
         this.fs.getDefaultReplication(this.outputFile));
 
     this.blockSize = properties.getPropAsLong(ForkOperatorUtils
@@ -275,7 +275,7 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
         ImmutableSet.of(new 
FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten()))
     );
     this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson());
- }
+  }
 
   /**
    * {@inheritDoc}.
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index dc0b65c..1efde8c 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -322,7 +322,7 @@ public abstract class HiveRegister implements Closeable {
     }
   }
 
-  protected boolean needToUpdateTable(HiveTable existingTable, HiveTable 
newTable) {
+  public boolean needToUpdateTable(HiveTable existingTable, HiveTable 
newTable) {
     return getTableComparator(existingTable, newTable).compareAll().result();
   }
 
@@ -339,7 +339,7 @@ public abstract class HiveRegister implements Closeable {
     }
   }
 
-  protected boolean needToUpdatePartition(HivePartition existingPartition, 
HivePartition newPartition) {
+  public boolean needToUpdatePartition(HivePartition existingPartition, 
HivePartition newPartition) {
     return getPartitionComparator(existingPartition, 
newPartition).compareAll().result();
   }
 
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 26ad94a..06d2ec0 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -402,8 +402,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   @Deprecated
   @Override
   public boolean createTableIfNotExists(HiveTable table) throws IOException {
-    try (AutoReturnableObject<IMetaStoreClient> client = 
this.clientPool.getClient();
-        AutoCloseableHiveLock lock = 
this.locks.getTableLock(table.getDbName(), table.getTableName())) {
+    try (AutoReturnableObject<IMetaStoreClient> client = 
this.clientPool.getClient()) {
       return createTableIfNotExists(client.get(), 
HiveMetaStoreUtils.getTable(table), table);
     }
   }
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
new file mode 100644
index 0000000..a4550bc
--- /dev/null
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive.writer;
+
+import com.google.common.base.Joiner;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+
+
+/**
+ * This writer is used to register the hiveSpec into hive metaStore
+ * For add_files operation, this writer will use cache to determine whether 
the partition is registered already or need to be altered
+ * and then register the partition if needed
+ * For rewrite_files operation, this writer will directly register the new 
hive spec and try to de-register the old hive spec if oldFilePrefixes is set
+ * For drop_files operation, this writer will de-register the hive partition 
only if oldFilePrefixes is set in the GMCE
+ */
+@Slf4j
+public class HiveMetadataWriter implements MetadataWriter {
+
+  private static final String HIVE_REGISTRATION_WHITELIST = 
"hive.registration.whitelist";
+  private static final String HIVE_REGISTRATION_BLACKLIST = 
"hive.registration.blacklist";
+  private static final String HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 
"hive.registration.timeout.seconds";
+  private static final long DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 60;
+  private final Joiner tableNameJoiner = Joiner.on('.');
+  private final Closer closer = Closer.create();
+  protected final HiveRegister hiveRegister;
+  private final WhitelistBlacklist whiteistBlacklist;
+  @Getter
+  private final KafkaSchemaRegistry schemaRegistry;
+  private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>> 
currentExecutionMap;
+
+  private final HashMap<String, Cache<String, String>> schemaCreationTimeMap;
+  private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps;
+  private final HashMap<String, String> lastestSchemaMap;
+  private final long timeOutSeconds;
+  private State state;
+
+  public HiveMetadataWriter(State state) throws IOException {
+    this.state = state;
+    this.hiveRegister = this.closer.register(HiveRegister.get(state));
+    this.whiteistBlacklist = new 
WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
+        state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
+    this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
+    this.currentExecutionMap = new HashMap<>();
+    this.schemaCreationTimeMap = new HashMap<>();
+    this.specMaps = new HashMap<>();
+    this.lastestSchemaMap = new HashMap<>();
+    this.timeOutSeconds =
+        state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, 
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
+  }
+
+  @Override
+  public void flush(String dbName, String tableName) throws IOException {
+    String tableKey = tableNameJoiner.join(dbName, tableName);
+    log.info("start to flush table: " + tableKey);
+    HashMap<List<String>, ListenableFuture<Void>> executionMap =
+        this.currentExecutionMap.computeIfAbsent(tableKey, s -> new 
HashMap<>());
+    //iterator all execution to get the result to make sure they all succeeded
+    for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : 
executionMap.entrySet()) {
+      try {
+        execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
+        log.error("Error when getting the result of registration for table" + 
tableKey);
+        throw new RuntimeException(e);
+      }
+    }
+    executionMap.clear();
+    log.info("finish flushing table: " + tableKey);
+  }
+
+  public void write(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap,
+      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) 
throws IOException {
+    String dbName = tableSpec.getTable().getDbName();
+    String tableName = tableSpec.getTable().getTableName();
+    String tableKey = tableNameJoiner.join(dbName, tableName);
+    if (!specMaps.containsKey(tableKey) || specMaps.get(tableKey).size() == 0) 
{
+      //Try to create table first to make sure table existed
+      this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+    }
+
+    //ToDo: after making sure all spec has topic.name set, we should use 
topicName as key for schema
+    if (!lastestSchemaMap.containsKey(tableKey)) {
+      HiveTable existingTable = this.hiveRegister.getTable(dbName, 
tableName).get();
+      lastestSchemaMap.put(tableKey,
+          
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+    }
+
+    //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
+    //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation
+    String topicName = null;
+    if (gmce.getTopicPartitionOffsetsRange() != null) {
+      String topicPartitionString = 
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+      //In case the topic name is not the table name or the topic name 
contains '-'
+      topicName = topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+    }
+    switch (gmce.getOperationType()) {
+      case add_files: {
+        addFiles(gmce, newSpecsMap, dbName, tableName, topicName);
+        break;
+      }
+      case drop_files: {
+        deleteFiles(gmce, oldSpecsMap, dbName, tableName);
+        break;
+      }
+      case rewrite_files: {
+        //de-register old partitions
+        deleteFiles(gmce, oldSpecsMap, dbName, tableName);
+        //register new partitions
+        addFiles(gmce, newSpecsMap, dbName, tableName, topicName);
+        break;
+      }
+      default: {
+        log.error("unsupported operation {}", 
gmce.getOperationType().toString());
+        return;
+      }
+    }
+  }
+
+  public void deleteFiles(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> oldSpecsMap, String dbName,
+      String tableName) throws IOException {
+    if (gmce.getOldFilePrefixes() == null || 
gmce.getOldFilePrefixes().isEmpty()) {
+      //We only de-register partition when old file prefixes is set, since 
hive partition refer to a whole directory
+      return;
+    }
+    for (Collection<HiveSpec> specs : oldSpecsMap.values()) {
+      for (HiveSpec spec : specs) {
+        if (spec.getTable().getDbName().equals(dbName) && 
spec.getTable().getTableName().equals(tableName)) {
+          if (spec.getPartition().isPresent()) {
+            deRegisterPartitionHelper(dbName, tableName, spec);
+          }
+        }
+      }
+    }
+    //TODO: De-register table if table location does not exist (Configurable)
+
+  }
+
+  protected void deRegisterPartitionHelper(String dbName, String tableName, 
HiveSpec spec) throws IOException {
+    hiveRegister.dropPartitionIfExists(dbName, tableName, 
spec.getTable().getPartitionKeys(),
+        spec.getPartition().get().getValues());
+  }
+
+  public void addFiles(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap, String dbName,
+      String tableName, String topicName) throws SchemaRegistryException {
+    String tableKey = tableNameJoiner.join(dbName, tableName);
+    for (Collection<HiveSpec> specs : newSpecsMap.values()) {
+      for (HiveSpec spec : specs) {
+        if (spec.getTable().getDbName().equals(dbName) && 
spec.getTable().getTableName().equals(tableName)) {
+          List<String> partitionValue =
+              spec.getPartition().isPresent() ? 
spec.getPartition().get().getValues() : Lists.newArrayList();
+          Cache<List<String>, HiveSpec> hiveSpecCache = 
specMaps.computeIfAbsent(tableKey,
+              s -> CacheBuilder.newBuilder()
+                  
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+                      MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), 
TimeUnit.HOURS)
+                  .build());
+          HiveSpec existedSpec = hiveSpecCache.getIfPresent(partitionValue);
+          if (existedSpec != null) {
+            //if existedSpec is not null, it means we already registered this 
partition, so check whether we need to update the table/partition
+            schemaUpdateHelper(gmce, spec, topicName, tableKey);
+            if ((this.hiveRegister.needToUpdateTable(existedSpec.getTable(), 
spec.getTable())) || (
+                spec.getPartition().isPresent() && 
this.hiveRegister.needToUpdatePartition(
+                    existedSpec.getPartition().get(), 
spec.getPartition().get()))) {
+              registerSpec(tableKey, partitionValue, spec, hiveSpecCache);
+            }
+          } else {
+            registerSpec(tableKey, partitionValue, spec, hiveSpecCache);
+          }
+        }
+      }
+    }
+  }
+
+  private void registerSpec(String tableKey, List<String> partitionValue, 
HiveSpec spec,
+      Cache<List<String>, HiveSpec> hiveSpecCache) {
+    HashMap<List<String>, ListenableFuture<Void>> executionMap =
+        this.currentExecutionMap.computeIfAbsent(tableKey, s -> new 
HashMap<>());
+    if (executionMap.containsKey(partitionValue)) {
+      try {
+        executionMap.get(partitionValue).get(timeOutSeconds, TimeUnit.SECONDS);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
+        log.error("Error when getting the result of registration for table" + 
tableKey);
+        throw new RuntimeException(e);
+      }
+    }
+    executionMap.put(partitionValue, this.hiveRegister.register(spec));
+    hiveSpecCache.put(partitionValue, spec);
+  }
+
+  private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec 
spec, String topicName, String tableKey)
+      throws SchemaRegistryException {
+    if (gmce.getSchemaSource() != SchemaSource.NONE) {
+      String newSchemaString =
+          
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+      if (newSchemaString != null) {
+        Schema newSchema = new Schema.Parser().parse(newSchemaString);
+        String newSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(newSchema);
+        Cache<String, String> existedSchemaCreationTimes = 
schemaCreationTimeMap.computeIfAbsent(tableKey,
+            s -> CacheBuilder.newBuilder()
+                .expireAfterAccess(
+                    state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME, 
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME),
+                    TimeUnit.HOURS)
+                .build());
+        if (gmce.getSchemaSource() == SchemaSource.EVENT) {
+          // Schema source is Event, update schema anyway
+          lastestSchemaMap.put(tableKey, newSchemaString);
+          // Clear the schema versions cache so next time if we see schema 
source is schemaRegistry, we will contact schemaRegistry and update
+          existedSchemaCreationTimes.cleanUp();
+        } else if (gmce.getSchemaSource() == SchemaSource.SCHEMAREGISTRY && 
newSchemaCreationTime != null
+            && existedSchemaCreationTimes.getIfPresent(newSchemaCreationTime) 
== null) {
+          // We haven't seen this schema before, so we query schemaRegistry to 
get latest schema
+          if (topicName != null && !topicName.isEmpty()) {
+            Schema latestSchema = (Schema) 
this.schemaRegistry.getLatestSchemaByTopic(topicName);
+            String latestCreationTime = 
AvroUtils.getSchemaCreationTime(latestSchema);
+            if (latestCreationTime.equals(newSchemaCreationTime)) {
+              //new schema is the latest schema, we update our record
+              lastestSchemaMap.put(tableKey, newSchemaString);
+            }
+            existedSchemaCreationTimes.put(newSchemaCreationTime, "");
+          }
+        }
+      }
+    }
+    //Force to set the schema even there is no schema literal defined in the 
spec
+    spec.getTable()
+        .getSerDeProps()
+        
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
lastestSchemaMap.get(tableKey));
+  }
+
+  @Override
+  public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, 
Map<String, Collection<HiveSpec>> newSpecsMap,
+      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) 
throws IOException {
+    GenericRecord genericRecord = recordEnvelope.getRecord();
+    GobblinMetadataChangeEvent gmce =
+        (GobblinMetadataChangeEvent) 
SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
+    if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName())) {
+      write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+    } else {
+      log.info(String.format("Skip table %s.%s since it's blacklisted", 
tableSpec.getTable().getDbName(),
+          tableSpec.getTable().getTableName()));
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.closer.close();
+  }
+}
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
similarity index 91%
rename from 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataWriter.java
rename to 
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index 03a6d5b..e4a0406 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.iceberg.writer;
+package org.apache.gobblin.hive.writer;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -30,6 +30,9 @@ import org.apache.gobblin.stream.RecordEnvelope;
  * This is the interface of the writer which is used to calculate and 
accumulate the desired metadata and register to the metadata store
  */
 public interface MetadataWriter extends Closeable {
+  String CACHE_EXPIRING_TIME = "GMCEWriter.cache.expiring.time.hours";
+  int DEFAULT_CACHE_EXPIRING_TIME = 1;
+
   /*
   Register the metadata of specific table to the metadata store
    */
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 44c0f23..282e081 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.hive.writer.MetadataWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -76,9 +77,7 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
   public static final String METADATA_REGISTRATION_THREADS = 
"metadata.registration.threads";
   public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = 
"metadata.parallel.runner.timeout.mills";
   public static final String HIVE_PARTITION_NAME = "hive.partition.name";
-  public static final String CACHE_EXPIRING_TIME = 
"GMCEWriter.cache.expiring.time.hours";
   public static final String GMCE_METADATA_WRITER_CLASSES = 
"gmce.metadata.writer.classes";
-  public static final int DEFAULT_CACHE_EXPIRING_TIME = 1;
   public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
   public static final String TABLE_NAME_DELIMITER = ".";
   @Getter
@@ -205,23 +204,23 @@ public class GobblinMCEWriter implements 
DataWriter<GenericRecord> {
       State registerState = setHiveRegProperties(state, gmce, true);
       
computeSpecMap(Lists.newArrayList(Iterables.transform(gmce.getNewFiles(), 
dataFile -> dataFile.getFilePath())),
           newSpecsMap, newSpecsMaps.computeIfAbsent(datasetName, t -> 
CacheBuilder.newBuilder()
-              .expireAfterAccess(state.getPropAsInt(CACHE_EXPIRING_TIME,
-                  DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+              
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+                  MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
               .build()), registerState, false);
     }
     if (gmce.getOldFilePrefixes() != null) {
       State registerState = setHiveRegProperties(state, gmce, false);
       computeSpecMap(gmce.getOldFilePrefixes(), oldSpecsMap, 
oldSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder
           .newBuilder()
-          .expireAfterAccess(state.getPropAsInt(CACHE_EXPIRING_TIME,
-              DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+          
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+              MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
           .build()), registerState, true);
     } else if (gmce.getOldFiles() != null) {
       State registerState = setHiveRegProperties(state, gmce, false);
       computeSpecMap(gmce.getOldFiles(), oldSpecsMap, 
oldSpecsMaps.computeIfAbsent(datasetName,
           t -> CacheBuilder.newBuilder()
-              .expireAfterAccess(state.getPropAsInt(CACHE_EXPIRING_TIME,
-                  DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+              
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+                  MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
               .build()), registerState, false);
     }
     if (newSpecsMap.isEmpty() && oldSpecsMap.isEmpty()) {
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ed89f1a..e730408 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -78,6 +78,7 @@ import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.hive.writer.MetadataWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -103,7 +104,6 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.HiveCatalogs;
-import org.apache.thrift.TException;
 import org.joda.time.DateTime;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
@@ -353,8 +353,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         tableMetadata.lastSchemaVersion.or(() -> 
props.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME)));
     String lastSchemaVersion = tableMetadata.lastSchemaVersion.get();
     tableMetadata.candidateSchemas = 
Optional.of(tableMetadata.candidateSchemas.or(() -> CacheBuilder.newBuilder()
-        .expireAfterAccess(conf.getInt(GobblinMCEWriter.CACHE_EXPIRING_TIME,
-            GobblinMCEWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+        .expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
+            MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
         .build()));
     Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
     try {
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
similarity index 71%
copy from 
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
copy to 
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 4f1e156..0d13ecf 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -18,11 +18,13 @@
 package org.apache.gobblin.iceberg.writer;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.avro.SchemaBuilder;
@@ -32,10 +34,12 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.hive.HivePartition;
 import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.metadata.DataFile;
@@ -56,18 +60,24 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.iceberg.FindFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.thrift.TException;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class GobblinMCEWriterTest extends HiveMetastoreTest {
+public class HiveMetadataWriterTest extends HiveMetastoreTest {
 
   org.apache.avro.Schema avroDataSchema = SchemaBuilder.record("test")
       .fields()
@@ -92,9 +102,13 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest 
{
   static File hourlyDataFile_2;
   static File hourlyDataFile_1;
   static File dailyDataFile;
+  HiveMetastoreClientPool hc;
+  IMetaStoreClient client;
+  private static TestHiveMetastore testHiveMetastore;
 
   @AfterClass
   public void clean() throws Exception {
+    //Finally stop the metaStore
     stopMetastore();
     gobblinMCEWriter.close();
     FileUtils.forceDeleteOnExit(tmpDir);
@@ -105,8 +119,15 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
     startMetastore();
     State state = 
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
     Optional<String> metastoreUri = 
Optional.fromNullable(state.getProperties().getProperty(HiveRegister.HIVE_METASTORE_URI_KEY));
-    HiveMetastoreClientPool hc = 
HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
+    hc = HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
+    client = hc.getClient().get();
     tmpDir = Files.createTempDir();
+    try {
+      client.getDatabase(dbName);
+    } catch (NoSuchObjectException e) {
+      client.createDatabase(
+          new Database(dbName, "database", tmpDir.getAbsolutePath() + 
"/metastore", Collections.emptyMap()));
+    }
     hourlyDataFile_1 = new File(tmpDir, 
"data/tracking/testTable/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
     hourlyDataFile_2 = new File(tmpDir, 
"data/tracking/testTable/hourly/2020/03/17/09/data.avro");
@@ -143,34 +164,22 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
         KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
     state.setProp("default.hive.registration.policy",
         TestHiveRegistrationPolicy.class.getName());
+    state.setProp("gmce.metadata.writer.classes", 
"org.apache.gobblin.hive.writer.HiveMetadataWriter");
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), 
state);
-    ((IcebergMetadataWriter) 
gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
-        HiveMetastoreTest.catalog);
-    _avroPartitionSchema =
-        
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
   }
 
-  @Test
-  public void testWriteAddFileGMCE() throws IOException {
+  @Test( priority = 3 )
+  public void testHiveWriteAddFileGMCE() throws IOException {
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(10L))));
-    Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 1);
-    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
-    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "1000-2000").build());
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(20L))));
     gobblinMCEWriter.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-2000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 1);
-    // Assert low watermark and high watermark set properly
-    
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "9");
-    
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "20");
+
 
     /*test flush twice*/
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "2000-3000").build());
@@ -184,26 +193,20 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(30L))));
     gobblinMCEWriter.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
-    
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "20");
-    
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "30");
 
-    /* Test it will skip event with lower watermark*/
-    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "3000-4000").build());
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
-        new KafkaStreamingExtractor.KafkaWatermark(
-            new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
-            new LongWatermark(30L))));
-    gobblinMCEWriter.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+    //Test Hive writer can register partition
+    try {
+      Assert.assertTrue(client.tableExists("hivedb", "testTable"));
+      Assert.assertTrue(client.getPartition("hivedb", 
"testTable",Lists.newArrayList("2020-03-17-09")) != null);
+      Assert.assertTrue(client.getPartition("hivedb", 
"testTable",Lists.newArrayList("2020-03-17-08")) != null);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+
   }
 
-  @Test(dependsOnMethods = {"testWriteAddFileGMCE"})
-  public void testWriteRewriteFileGMCE() throws IOException {
+  @Test(dependsOnMethods = {"testHiveWriteAddFileGMCE"})
+  public void testHiveWriteRewriteFileGMCE() throws IOException {
     gmce.setTopicPartitionOffsetsRange(null);
     FileSystem fs = FileSystem.get(new Configuration());
     String filePath = new 
Path(hourlyDataFile_1.getParentFile().getAbsolutePath()).toString();
@@ -216,25 +219,23 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
     gmce.setNewFiles(Lists.newArrayList(dailyFile));
     gmce.setOldFilePrefixes(Lists.newArrayList(filePath, filePath_1));
     gmce.setOperationType(OperationType.rewrite_files);
-    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    Iterator<org.apache.iceberg.DataFile>
-        result = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
filePath_1)).collect().iterator();
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
-    Assert.assertTrue(result.hasNext());
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(40L))));
     gobblinMCEWriter.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    String dailyFilePath = new Path(dailyDataFile.toString()).toString();
-    result = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
dailyFilePath)).collect().iterator();
-    Assert.assertEquals(result.next().path(), dailyFilePath);
-    Assert.assertFalse(result.hasNext());
-    result = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
filePath)).collect().iterator();
-    Assert.assertFalse(result.hasNext());
-    result = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
filePath_1)).collect().iterator();
-    Assert.assertFalse(result.hasNext());
+
+    //Test hive writer re-write operation can de-register old partitions and 
register new one
+    try {
+       Assert.assertTrue(client.getPartition("hivedb", 
"testTable",Lists.newArrayList("2020-03-17-00")) != null);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    Assert.assertThrows(new Assert.ThrowingRunnable() {
+      @Override public void run() throws Throwable {
+        client.getPartition("hivedb", 
"testTable",Lists.newArrayList("2020-03-17-08"));
+      }
+    });
   }
 
   private String writeRecord(File file) throws IOException {
@@ -256,8 +257,28 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
       super(props);
     }
     protected Optional<HivePartition> getPartition(Path path, HiveTable table) 
throws IOException {
-      return Optional.of(new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList("2020-03-17-03"))
-          .withDbName("hivedb").withTableName("testTable").build());
+      String partitionValue = "";
+      if (path.toString().contains("hourly/2020/03/17/08")) {
+        partitionValue = "2020-03-17-08";
+      } else if (path.toString().contains("hourly/2020/03/17/09")) {
+        partitionValue = "2020-03-17-09";
+      } else if (path.toString().contains("daily/2020/03/17")) {
+        partitionValue = "2020-03-17-00";
+      }
+      HivePartition partition = new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
+          .withDbName("hivedb").withTableName("testTable").build();
+      partition.setLocation(path.toString());
+      return Optional.of(partition);
+    }
+    @Override
+    protected List<HiveTable> getTables(Path path) throws IOException {
+      List<HiveTable> tables = super.getTables(path);
+      for (HiveTable table : tables) {
+        table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
+            new HiveRegistrationUnit.Column("testpartition", 
serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)));
+        table.setLocation(tmpDir.getAbsolutePath());
+      }
+      return tables;
     }
     protected Iterable<String> getDatabaseNames(Path path) {
       return Lists.newArrayList("hivedb");
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
similarity index 89%
rename from 
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
rename to 
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 4f1e156..88f72f9 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -33,9 +33,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.hive.HivePartition;
-import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.metadata.DataFile;
@@ -67,7 +65,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class GobblinMCEWriterTest extends HiveMetastoreTest {
+public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
   org.apache.avro.Schema avroDataSchema = SchemaBuilder.record("test")
       .fields()
@@ -82,7 +80,6 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest {
       .endRecord();
   org.apache.avro.Schema _avroPartitionSchema;
   private String dbName = "hivedb";
-  private String tableName = "testTable";
 
   private GobblinMCEWriter gobblinMCEWriter;
 
@@ -95,7 +92,6 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest {
 
   @AfterClass
   public void clean() throws Exception {
-    stopMetastore();
     gobblinMCEWriter.close();
     FileUtils.forceDeleteOnExit(tmpDir);
   }
@@ -104,14 +100,12 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
     startMetastore();
     State state = 
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
-    Optional<String> metastoreUri = 
Optional.fromNullable(state.getProperties().getProperty(HiveRegister.HIVE_METASTORE_URI_KEY));
-    HiveMetastoreClientPool hc = 
HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
     tmpDir = Files.createTempDir();
-    hourlyDataFile_1 = new File(tmpDir, 
"data/tracking/testTable/hourly/2020/03/17/08/data.avro");
+    hourlyDataFile_1 = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
-    hourlyDataFile_2 = new File(tmpDir, 
"data/tracking/testTable/hourly/2020/03/17/09/data.avro");
+    hourlyDataFile_2 = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2020/03/17/09/data.avro");
     Files.createParentDirs(hourlyDataFile_2);
-    dailyDataFile = new File(tmpDir, 
"data/tracking/testTable/daily/2020/03/17/data.avro");
+    dailyDataFile = new File(tmpDir, 
"data/tracking/testIcebergTable/daily/2020/03/17/data.avro");
     Files.createParentDirs(dailyDataFile);
     dataDir = new File(hourlyDataFile_1.getParent());
     Assert.assertTrue(dataDir.exists());
@@ -122,7 +116,7 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest 
{
         .setDatasetIdentifier(DatasetIdentifier.newBuilder()
             .setDataOrigin(DataOrigin.EI)
             .setDataPlatformUrn("urn:li:dataPlatform:hdfs")
-            .setNativeName("/data/tracking/testTable")
+            .setNativeName("/data/tracking/testIcebergTable")
             .build())
         .setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "0-1000").build())
         .setFlowId("testFlow")
@@ -136,13 +130,13 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
         .setTableSchema(avroDataSchema.toString())
         .setCluster(ClustersNames.getInstance().getClusterName())
         .setPartitionColumns(Lists.newArrayList("testpartition"))
-        .setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
+        
.setRegistrationPolicy(TestHiveRegistrationPolicyForIceberg.class.getName())
         .setRegistrationProperties(ImmutableMap.<String, 
String>builder().put("hive.database.name", dbName).build())
         .build();
     state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
         KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
     state.setProp("default.hive.registration.policy",
-        TestHiveRegistrationPolicy.class.getName());
+        TestHiveRegistrationPolicyForIceberg.class.getName());
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), 
state);
     ((IcebergMetadataWriter) 
gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
         HiveMetastoreTest.catalog);
@@ -150,7 +144,7 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest 
{
         
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
   }
 
-  @Test
+  @Test ( priority = 1 )
   public void testWriteAddFileGMCE() throws IOException {
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
@@ -202,7 +196,8 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest 
{
     Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
   }
 
-  @Test(dependsOnMethods = {"testWriteAddFileGMCE"})
+  //Make sure hive test execute later and close the metastore
+  @Test( priority = 2 )
   public void testWriteRewriteFileGMCE() throws IOException {
     gmce.setTopicPartitionOffsetsRange(null);
     FileSystem fs = FileSystem.get(new Configuration());
@@ -250,20 +245,28 @@ public class GobblinMCEWriterTest extends 
HiveMetastoreTest {
     return path;
   }
 
-  public static class TestHiveRegistrationPolicy extends 
HiveRegistrationPolicyBase {
+  public static class TestHiveRegistrationPolicyForIceberg extends 
HiveRegistrationPolicyBase {
 
-    public TestHiveRegistrationPolicy(State props) throws IOException {
+    public TestHiveRegistrationPolicyForIceberg(State props) throws 
IOException {
       super(props);
     }
     protected Optional<HivePartition> getPartition(Path path, HiveTable table) 
throws IOException {
-      return Optional.of(new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList("2020-03-17-03"))
-          .withDbName("hivedb").withTableName("testTable").build());
+      String partitionValue = "";
+      if (path.toString().contains("hourly/2020/03/17/08")) {
+        partitionValue = "2020-03-17-08";
+      } else if (path.toString().contains("hourly/2020/03/17/09")) {
+        partitionValue = "2020-03-17-09";
+      } else if (path.toString().contains("daily/2020/03/17")) {
+        partitionValue = "2020-03-17-00";
+      }
+      return Optional.of(new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
+          .withDbName("hivedb").withTableName("testIcebergTable").build());
     }
     protected Iterable<String> getDatabaseNames(Path path) {
       return Lists.newArrayList("hivedb");
     }
     protected List<String> getTableNames(Optional<String> dbPrefix, Path path) 
{
-      return Lists.newArrayList("testTable");
+      return Lists.newArrayList("testIcebergTable");
     }
   }
 }

Reply via email to