This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d9ae535 [GOBBLIN-1396] Enable HiveWriter to consume gmce and register
into hive MetadataStore
d9ae535 is described below
commit d9ae5353c74fdcd385835fca9b586b3fdb90971b
Author: Zihan Li <[email protected]>
AuthorDate: Tue Mar 23 15:21:10 2021 -0700
[GOBBLIN-1396] Enable HiveWriter to consume gmce and register into hive
MetadataStore
[GOBBLIN-1396]Enable HiveWriter to consume gmce
and register into hive MetadataStore
bug fix
add default branch to pass styleCheck
bug fix to avoid dead lock
address comments
address comments
enable rewrite/drop file in HiveMetadataWriter
remove unintentional change
address comments
force to set schema literal
Closes #3234 from ZihanLi58/GOBBLIN-1396
---
.../org/apache/gobblin/writer/FsDataWriter.java | 8 +-
.../java/org/apache/gobblin/hive/HiveRegister.java | 4 +-
.../hive/metastore/HiveMetaStoreBasedRegister.java | 3 +-
.../gobblin/hive/writer/HiveMetadataWriter.java | 290 +++++++++++++++++++++
.../gobblin/hive}/writer/MetadataWriter.java | 5 +-
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 15 +-
.../iceberg/writer/IcebergMetadataWriter.java | 6 +-
...WriterTest.java => HiveMetadataWriterTest.java} | 123 +++++----
...terTest.java => IcebergMetadataWriterTest.java} | 43 +--
9 files changed, 406 insertions(+), 91 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index d8060ef..7670b5b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -101,8 +101,8 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
JobConfigurationUtils.putStateIntoConfiguration(properties, conf);
this.fs = WriterUtils.getWriterFS(properties, this.numBranches,
this.branchId);
this.fileContext = FileContext.getFileContext(
- WriterUtils.getWriterFsUri(properties, this.numBranches,
this.branchId),
- conf);
+ WriterUtils.getWriterFsUri(properties, this.numBranches,
this.branchId),
+ conf);
// Initialize staging/output directory
Path writerStagingDir = this.writerAttemptIdOptional.isPresent() ?
WriterUtils
@@ -131,7 +131,7 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
this.replicationFactor = properties.getPropAsShort(ForkOperatorUtils
-
.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR,
this.numBranches, this.branchId),
+
.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR,
this.numBranches, this.branchId),
this.fs.getDefaultReplication(this.outputFile));
this.blockSize = properties.getPropAsLong(ForkOperatorUtils
@@ -275,7 +275,7 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
ImmutableSet.of(new
FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten()))
);
this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson());
- }
+ }
/**
* {@inheritDoc}.
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index dc0b65c..1efde8c 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -322,7 +322,7 @@ public abstract class HiveRegister implements Closeable {
}
}
- protected boolean needToUpdateTable(HiveTable existingTable, HiveTable
newTable) {
+ public boolean needToUpdateTable(HiveTable existingTable, HiveTable
newTable) {
return getTableComparator(existingTable, newTable).compareAll().result();
}
@@ -339,7 +339,7 @@ public abstract class HiveRegister implements Closeable {
}
}
- protected boolean needToUpdatePartition(HivePartition existingPartition,
HivePartition newPartition) {
+ public boolean needToUpdatePartition(HivePartition existingPartition,
HivePartition newPartition) {
return getPartitionComparator(existingPartition,
newPartition).compareAll().result();
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 26ad94a..06d2ec0 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -402,8 +402,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
@Deprecated
@Override
public boolean createTableIfNotExists(HiveTable table) throws IOException {
- try (AutoReturnableObject<IMetaStoreClient> client =
this.clientPool.getClient();
- AutoCloseableHiveLock lock =
this.locks.getTableLock(table.getDbName(), table.getTableName())) {
+ try (AutoReturnableObject<IMetaStoreClient> client =
this.clientPool.getClient()) {
return createTableIfNotExists(client.get(),
HiveMetaStoreUtils.getTable(table), table);
}
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
new file mode 100644
index 0000000..a4550bc
--- /dev/null
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive.writer;
+
+import com.google.common.base.Joiner;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+
+
+/**
+ * This writer is used to register the hiveSpec into hive metaStore
+ * For add_files operation, this writer will use cache to determine whether
the partition is registered already or need to be altered
+ * and then register the partition if needed
+ * For rewrite_files operation, this writer will directly register the new
hive spec and try to de-register the old hive spec if oldFilePrefixes is set
+ * For drop_files operation, this writer will de-register the hive partition
only if oldFilePrefixes is set in the GMCE
+ */
+@Slf4j
+public class HiveMetadataWriter implements MetadataWriter {
+
+ private static final String HIVE_REGISTRATION_WHITELIST =
"hive.registration.whitelist";
+ private static final String HIVE_REGISTRATION_BLACKLIST =
"hive.registration.blacklist";
+ private static final String HIVE_REGISTRATION_TIMEOUT_IN_SECONDS =
"hive.registration.timeout.seconds";
+ private static final long DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 60;
+ private final Joiner tableNameJoiner = Joiner.on('.');
+ private final Closer closer = Closer.create();
+ protected final HiveRegister hiveRegister;
+ private final WhitelistBlacklist whiteistBlacklist;
+ @Getter
+ private final KafkaSchemaRegistry schemaRegistry;
+ private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>>
currentExecutionMap;
+
+ private final HashMap<String, Cache<String, String>> schemaCreationTimeMap;
+ private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps;
+ private final HashMap<String, String> lastestSchemaMap;
+ private final long timeOutSeconds;
+ private State state;
+
+ public HiveMetadataWriter(State state) throws IOException {
+ this.state = state;
+ this.hiveRegister = this.closer.register(HiveRegister.get(state));
+ this.whiteistBlacklist = new
WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
+ state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
+ this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
+ this.currentExecutionMap = new HashMap<>();
+ this.schemaCreationTimeMap = new HashMap<>();
+ this.specMaps = new HashMap<>();
+ this.lastestSchemaMap = new HashMap<>();
+ this.timeOutSeconds =
+ state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS,
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
+ }
+
+ @Override
+ public void flush(String dbName, String tableName) throws IOException {
+ String tableKey = tableNameJoiner.join(dbName, tableName);
+ log.info("start to flush table: " + tableKey);
+ HashMap<List<String>, ListenableFuture<Void>> executionMap =
+ this.currentExecutionMap.computeIfAbsent(tableKey, s -> new
HashMap<>());
+ //iterator all execution to get the result to make sure they all succeeded
+ for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution :
executionMap.entrySet()) {
+ try {
+ execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e)
{
+ log.error("Error when getting the result of registration for table" +
tableKey);
+ throw new RuntimeException(e);
+ }
+ }
+ executionMap.clear();
+ log.info("finish flushing table: " + tableKey);
+ }
+
+ public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
+ Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
+ String dbName = tableSpec.getTable().getDbName();
+ String tableName = tableSpec.getTable().getTableName();
+ String tableKey = tableNameJoiner.join(dbName, tableName);
+ if (!specMaps.containsKey(tableKey) || specMaps.get(tableKey).size() == 0)
{
+ //Try to create table first to make sure table existed
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ }
+
+ //ToDo: after making sure all spec has topic.name set, we should use
topicName as key for schema
+ if (!lastestSchemaMap.containsKey(tableKey)) {
+ HiveTable existingTable = this.hiveRegister.getTable(dbName,
tableName).get();
+ lastestSchemaMap.put(tableKey,
+
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ }
+
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
+ String topicName = null;
+ if (gmce.getTopicPartitionOffsetsRange() != null) {
+ String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ switch (gmce.getOperationType()) {
+ case add_files: {
+ addFiles(gmce, newSpecsMap, dbName, tableName, topicName);
+ break;
+ }
+ case drop_files: {
+ deleteFiles(gmce, oldSpecsMap, dbName, tableName);
+ break;
+ }
+ case rewrite_files: {
+ //de-register old partitions
+ deleteFiles(gmce, oldSpecsMap, dbName, tableName);
+ //register new partitions
+ addFiles(gmce, newSpecsMap, dbName, tableName, topicName);
+ break;
+ }
+ default: {
+ log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ return;
+ }
+ }
+ }
+
+ public void deleteFiles(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> oldSpecsMap, String dbName,
+ String tableName) throws IOException {
+ if (gmce.getOldFilePrefixes() == null ||
gmce.getOldFilePrefixes().isEmpty()) {
+ //We only de-register partition when old file prefixes is set, since
hive partition refer to a whole directory
+ return;
+ }
+ for (Collection<HiveSpec> specs : oldSpecsMap.values()) {
+ for (HiveSpec spec : specs) {
+ if (spec.getTable().getDbName().equals(dbName) &&
spec.getTable().getTableName().equals(tableName)) {
+ if (spec.getPartition().isPresent()) {
+ deRegisterPartitionHelper(dbName, tableName, spec);
+ }
+ }
+ }
+ }
+ //TODO: De-register table if table location does not exist (Configurable)
+
+ }
+
+ protected void deRegisterPartitionHelper(String dbName, String tableName,
HiveSpec spec) throws IOException {
+ hiveRegister.dropPartitionIfExists(dbName, tableName,
spec.getTable().getPartitionKeys(),
+ spec.getPartition().get().getValues());
+ }
+
+ public void addFiles(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap, String dbName,
+ String tableName, String topicName) throws SchemaRegistryException {
+ String tableKey = tableNameJoiner.join(dbName, tableName);
+ for (Collection<HiveSpec> specs : newSpecsMap.values()) {
+ for (HiveSpec spec : specs) {
+ if (spec.getTable().getDbName().equals(dbName) &&
spec.getTable().getTableName().equals(tableName)) {
+ List<String> partitionValue =
+ spec.getPartition().isPresent() ?
spec.getPartition().get().getValues() : Lists.newArrayList();
+ Cache<List<String>, HiveSpec> hiveSpecCache =
specMaps.computeIfAbsent(tableKey,
+ s -> CacheBuilder.newBuilder()
+
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+ MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME),
TimeUnit.HOURS)
+ .build());
+ HiveSpec existedSpec = hiveSpecCache.getIfPresent(partitionValue);
+ if (existedSpec != null) {
+ //if existedSpec is not null, it means we already registered this
partition, so check whether we need to update the table/partition
+ schemaUpdateHelper(gmce, spec, topicName, tableKey);
+ if ((this.hiveRegister.needToUpdateTable(existedSpec.getTable(),
spec.getTable())) || (
+ spec.getPartition().isPresent() &&
this.hiveRegister.needToUpdatePartition(
+ existedSpec.getPartition().get(),
spec.getPartition().get()))) {
+ registerSpec(tableKey, partitionValue, spec, hiveSpecCache);
+ }
+ } else {
+ registerSpec(tableKey, partitionValue, spec, hiveSpecCache);
+ }
+ }
+ }
+ }
+ }
+
+ private void registerSpec(String tableKey, List<String> partitionValue,
HiveSpec spec,
+ Cache<List<String>, HiveSpec> hiveSpecCache) {
+ HashMap<List<String>, ListenableFuture<Void>> executionMap =
+ this.currentExecutionMap.computeIfAbsent(tableKey, s -> new
HashMap<>());
+ if (executionMap.containsKey(partitionValue)) {
+ try {
+ executionMap.get(partitionValue).get(timeOutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e)
{
+ log.error("Error when getting the result of registration for table" +
tableKey);
+ throw new RuntimeException(e);
+ }
+ }
+ executionMap.put(partitionValue, this.hiveRegister.register(spec));
+ hiveSpecCache.put(partitionValue, spec);
+ }
+
+ private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec
spec, String topicName, String tableKey)
+ throws SchemaRegistryException {
+ if (gmce.getSchemaSource() != SchemaSource.NONE) {
+ String newSchemaString =
+
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+ if (newSchemaString != null) {
+ Schema newSchema = new Schema.Parser().parse(newSchemaString);
+ String newSchemaCreationTime =
AvroUtils.getSchemaCreationTime(newSchema);
+ Cache<String, String> existedSchemaCreationTimes =
schemaCreationTimeMap.computeIfAbsent(tableKey,
+ s -> CacheBuilder.newBuilder()
+ .expireAfterAccess(
+ state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME),
+ TimeUnit.HOURS)
+ .build());
+ if (gmce.getSchemaSource() == SchemaSource.EVENT) {
+ // Schema source is Event, update schema anyway
+ lastestSchemaMap.put(tableKey, newSchemaString);
+ // Clear the schema versions cache so next time if we see schema
source is schemaRegistry, we will contact schemaRegistry and update
+ existedSchemaCreationTimes.cleanUp();
+ } else if (gmce.getSchemaSource() == SchemaSource.SCHEMAREGISTRY &&
newSchemaCreationTime != null
+ && existedSchemaCreationTimes.getIfPresent(newSchemaCreationTime)
== null) {
+ // We haven't seen this schema before, so we query schemaRegistry to
get latest schema
+ if (topicName != null && !topicName.isEmpty()) {
+ Schema latestSchema = (Schema)
this.schemaRegistry.getLatestSchemaByTopic(topicName);
+ String latestCreationTime =
AvroUtils.getSchemaCreationTime(latestSchema);
+ if (latestCreationTime.equals(newSchemaCreationTime)) {
+ //new schema is the latest schema, we update our record
+ lastestSchemaMap.put(tableKey, newSchemaString);
+ }
+ existedSchemaCreationTimes.put(newSchemaCreationTime, "");
+ }
+ }
+ }
+ }
+ //Force to set the schema even there is no schema literal defined in the
spec
+ spec.getTable()
+ .getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
lastestSchemaMap.get(tableKey));
+ }
+
+ @Override
+ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope,
Map<String, Collection<HiveSpec>> newSpecsMap,
+ Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
+ GenericRecord genericRecord = recordEnvelope.getRecord();
+ GobblinMetadataChangeEvent gmce =
+ (GobblinMetadataChangeEvent)
SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
+ if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
+ write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+ } else {
+ log.info(String.format("Skip table %s.%s since it's blacklisted",
tableSpec.getTable().getDbName(),
+ tableSpec.getTable().getTableName()));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closer.close();
+ }
+}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
similarity index 91%
rename from
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataWriter.java
rename to
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index 03a6d5b..e4a0406 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/MetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.iceberg.writer;
+package org.apache.gobblin.hive.writer;
import java.io.Closeable;
import java.io.IOException;
@@ -30,6 +30,9 @@ import org.apache.gobblin.stream.RecordEnvelope;
* This is the interface of the writer which is used to calculate and
accumulate the desired metadata and register to the metadata store
*/
public interface MetadataWriter extends Closeable {
+ String CACHE_EXPIRING_TIME = "GMCEWriter.cache.expiring.time.hours";
+ int DEFAULT_CACHE_EXPIRING_TIME = 1;
+
/*
Register the metadata of specific table to the metadata store
*/
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 44c0f23..282e081 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -76,9 +77,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
public static final String METADATA_REGISTRATION_THREADS =
"metadata.registration.threads";
public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS =
"metadata.parallel.runner.timeout.mills";
public static final String HIVE_PARTITION_NAME = "hive.partition.name";
- public static final String CACHE_EXPIRING_TIME =
"GMCEWriter.cache.expiring.time.hours";
public static final String GMCE_METADATA_WRITER_CLASSES =
"gmce.metadata.writer.classes";
- public static final int DEFAULT_CACHE_EXPIRING_TIME = 1;
public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
public static final String TABLE_NAME_DELIMITER = ".";
@Getter
@@ -205,23 +204,23 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
State registerState = setHiveRegProperties(state, gmce, true);
computeSpecMap(Lists.newArrayList(Iterables.transform(gmce.getNewFiles(),
dataFile -> dataFile.getFilePath())),
newSpecsMap, newSpecsMaps.computeIfAbsent(datasetName, t ->
CacheBuilder.newBuilder()
- .expireAfterAccess(state.getPropAsInt(CACHE_EXPIRING_TIME,
- DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+ MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()), registerState, false);
}
if (gmce.getOldFilePrefixes() != null) {
State registerState = setHiveRegProperties(state, gmce, false);
computeSpecMap(gmce.getOldFilePrefixes(), oldSpecsMap,
oldSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder
.newBuilder()
- .expireAfterAccess(state.getPropAsInt(CACHE_EXPIRING_TIME,
- DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+ MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()), registerState, true);
} else if (gmce.getOldFiles() != null) {
State registerState = setHiveRegProperties(state, gmce, false);
computeSpecMap(gmce.getOldFiles(), oldSpecsMap,
oldSpecsMaps.computeIfAbsent(datasetName,
t -> CacheBuilder.newBuilder()
- .expireAfterAccess(state.getPropAsInt(CACHE_EXPIRING_TIME,
- DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
+ MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()), registerState, false);
}
if (newSpecsMap.isEmpty() && oldSpecsMap.isEmpty()) {
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ed89f1a..e730408 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -78,6 +78,7 @@ import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -103,7 +104,6 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
-import org.apache.thrift.TException;
import org.joda.time.DateTime;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
@@ -353,8 +353,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
tableMetadata.lastSchemaVersion.or(() ->
props.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME)));
String lastSchemaVersion = tableMetadata.lastSchemaVersion.get();
tableMetadata.candidateSchemas =
Optional.of(tableMetadata.candidateSchemas.or(() -> CacheBuilder.newBuilder()
- .expireAfterAccess(conf.getInt(GobblinMCEWriter.CACHE_EXPIRING_TIME,
- GobblinMCEWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+ .expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
+ MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()));
Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
try {
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
similarity index 71%
copy from
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
copy to
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 4f1e156..0d13ecf 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -18,11 +18,13 @@
package org.apache.gobblin.iceberg.writer;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.SchemaBuilder;
@@ -32,10 +34,12 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.metadata.DataFile;
@@ -56,18 +60,24 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.thrift.TException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class GobblinMCEWriterTest extends HiveMetastoreTest {
+public class HiveMetadataWriterTest extends HiveMetastoreTest {
org.apache.avro.Schema avroDataSchema = SchemaBuilder.record("test")
.fields()
@@ -92,9 +102,13 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest
{
static File hourlyDataFile_2;
static File hourlyDataFile_1;
static File dailyDataFile;
+ HiveMetastoreClientPool hc;
+ IMetaStoreClient client;
+ private static TestHiveMetastore testHiveMetastore;
@AfterClass
public void clean() throws Exception {
+ //Finally stop the metaStore
stopMetastore();
gobblinMCEWriter.close();
FileUtils.forceDeleteOnExit(tmpDir);
@@ -105,8 +119,15 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
startMetastore();
State state =
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
Optional<String> metastoreUri =
Optional.fromNullable(state.getProperties().getProperty(HiveRegister.HIVE_METASTORE_URI_KEY));
- HiveMetastoreClientPool hc =
HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
+ hc = HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
+ client = hc.getClient().get();
tmpDir = Files.createTempDir();
+ try {
+ client.getDatabase(dbName);
+ } catch (NoSuchObjectException e) {
+ client.createDatabase(
+ new Database(dbName, "database", tmpDir.getAbsolutePath() +
"/metastore", Collections.emptyMap()));
+ }
hourlyDataFile_1 = new File(tmpDir,
"data/tracking/testTable/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
hourlyDataFile_2 = new File(tmpDir,
"data/tracking/testTable/hourly/2020/03/17/09/data.avro");
@@ -143,34 +164,22 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
state.setProp("default.hive.registration.policy",
TestHiveRegistrationPolicy.class.getName());
+ state.setProp("gmce.metadata.writer.classes",
"org.apache.gobblin.hive.writer.HiveMetadataWriter");
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(),
state);
- ((IcebergMetadataWriter)
gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
- HiveMetastoreTest.catalog);
- _avroPartitionSchema =
-
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
}
- @Test
- public void testWriteAddFileGMCE() throws IOException {
+ @Test( priority = 3 )
+ public void testHiveWriteAddFileGMCE() throws IOException {
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(10L))));
- Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 1);
- Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
- gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "1000-2000").build());
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(20L))));
gobblinMCEWriter.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-2000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 1);
- // Assert low watermark and high watermark set properly
-
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"9");
-
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"20");
+
/*test flush twice*/
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "2000-3000").build());
@@ -184,26 +193,20 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(30L))));
gobblinMCEWriter.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
-
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"20");
-
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"30");
- /* Test it will skip event with lower watermark*/
- gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "3000-4000").build());
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
- new KafkaStreamingExtractor.KafkaWatermark(
- new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
- new LongWatermark(30L))));
- gobblinMCEWriter.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+ //Test Hive writer can register partition
+ try {
+ Assert.assertTrue(client.tableExists("hivedb", "testTable"));
+ Assert.assertTrue(client.getPartition("hivedb",
"testTable",Lists.newArrayList("2020-03-17-09")) != null);
+ Assert.assertTrue(client.getPartition("hivedb",
"testTable",Lists.newArrayList("2020-03-17-08")) != null);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+
}
- @Test(dependsOnMethods = {"testWriteAddFileGMCE"})
- public void testWriteRewriteFileGMCE() throws IOException {
+ @Test(dependsOnMethods = {"testHiveWriteAddFileGMCE"})
+ public void testHiveWriteRewriteFileGMCE() throws IOException {
gmce.setTopicPartitionOffsetsRange(null);
FileSystem fs = FileSystem.get(new Configuration());
String filePath = new
Path(hourlyDataFile_1.getParentFile().getAbsolutePath()).toString();
@@ -216,25 +219,23 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
gmce.setNewFiles(Lists.newArrayList(dailyFile));
gmce.setOldFilePrefixes(Lists.newArrayList(filePath, filePath_1));
gmce.setOperationType(OperationType.rewrite_files);
- Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- Iterator<org.apache.iceberg.DataFile>
- result =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
filePath_1)).collect().iterator();
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
- Assert.assertTrue(result.hasNext());
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(40L))));
gobblinMCEWriter.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- String dailyFilePath = new Path(dailyDataFile.toString()).toString();
- result =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
dailyFilePath)).collect().iterator();
- Assert.assertEquals(result.next().path(), dailyFilePath);
- Assert.assertFalse(result.hasNext());
- result =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
filePath)).collect().iterator();
- Assert.assertFalse(result.hasNext());
- result =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
filePath_1)).collect().iterator();
- Assert.assertFalse(result.hasNext());
+
+ //Test hive writer re-write operation can de-register old partitions and
register new one
+ try {
+ Assert.assertTrue(client.getPartition("hivedb",
"testTable",Lists.newArrayList("2020-03-17-00")) != null);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ Assert.assertThrows(new Assert.ThrowingRunnable() {
+ @Override public void run() throws Throwable {
+ client.getPartition("hivedb",
"testTable",Lists.newArrayList("2020-03-17-08"));
+ }
+ });
}
private String writeRecord(File file) throws IOException {
@@ -256,8 +257,28 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
super(props);
}
protected Optional<HivePartition> getPartition(Path path, HiveTable table)
throws IOException {
- return Optional.of(new
HivePartition.Builder().withPartitionValues(Lists.newArrayList("2020-03-17-03"))
- .withDbName("hivedb").withTableName("testTable").build());
+ String partitionValue = "";
+ if (path.toString().contains("hourly/2020/03/17/08")) {
+ partitionValue = "2020-03-17-08";
+ } else if (path.toString().contains("hourly/2020/03/17/09")) {
+ partitionValue = "2020-03-17-09";
+ } else if (path.toString().contains("daily/2020/03/17")) {
+ partitionValue = "2020-03-17-00";
+ }
+ HivePartition partition = new
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
+ .withDbName("hivedb").withTableName("testTable").build();
+ partition.setLocation(path.toString());
+ return Optional.of(partition);
+ }
+ @Override
+ protected List<HiveTable> getTables(Path path) throws IOException {
+ List<HiveTable> tables = super.getTables(path);
+ for (HiveTable table : tables) {
+ table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
+ new HiveRegistrationUnit.Column("testpartition",
serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)));
+ table.setLocation(tmpDir.getAbsolutePath());
+ }
+ return tables;
}
protected Iterable<String> getDatabaseNames(Path path) {
return Lists.newArrayList("hivedb");
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
similarity index 89%
rename from
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
rename to
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 4f1e156..88f72f9 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -33,9 +33,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.hive.HivePartition;
-import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.metadata.DataFile;
@@ -67,7 +65,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class GobblinMCEWriterTest extends HiveMetastoreTest {
+public class IcebergMetadataWriterTest extends HiveMetastoreTest {
org.apache.avro.Schema avroDataSchema = SchemaBuilder.record("test")
.fields()
@@ -82,7 +80,6 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest {
.endRecord();
org.apache.avro.Schema _avroPartitionSchema;
private String dbName = "hivedb";
- private String tableName = "testTable";
private GobblinMCEWriter gobblinMCEWriter;
@@ -95,7 +92,6 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest {
@AfterClass
public void clean() throws Exception {
- stopMetastore();
gobblinMCEWriter.close();
FileUtils.forceDeleteOnExit(tmpDir);
}
@@ -104,14 +100,12 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
startMetastore();
State state =
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
- Optional<String> metastoreUri =
Optional.fromNullable(state.getProperties().getProperty(HiveRegister.HIVE_METASTORE_URI_KEY));
- HiveMetastoreClientPool hc =
HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
tmpDir = Files.createTempDir();
- hourlyDataFile_1 = new File(tmpDir,
"data/tracking/testTable/hourly/2020/03/17/08/data.avro");
+ hourlyDataFile_1 = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
- hourlyDataFile_2 = new File(tmpDir,
"data/tracking/testTable/hourly/2020/03/17/09/data.avro");
+ hourlyDataFile_2 = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2020/03/17/09/data.avro");
Files.createParentDirs(hourlyDataFile_2);
- dailyDataFile = new File(tmpDir,
"data/tracking/testTable/daily/2020/03/17/data.avro");
+ dailyDataFile = new File(tmpDir,
"data/tracking/testIcebergTable/daily/2020/03/17/data.avro");
Files.createParentDirs(dailyDataFile);
dataDir = new File(hourlyDataFile_1.getParent());
Assert.assertTrue(dataDir.exists());
@@ -122,7 +116,7 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest
{
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
.setDataPlatformUrn("urn:li:dataPlatform:hdfs")
- .setNativeName("/data/tracking/testTable")
+ .setNativeName("/data/tracking/testIcebergTable")
.build())
.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "0-1000").build())
.setFlowId("testFlow")
@@ -136,13 +130,13 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
.setTableSchema(avroDataSchema.toString())
.setCluster(ClustersNames.getInstance().getClusterName())
.setPartitionColumns(Lists.newArrayList("testpartition"))
- .setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
+
.setRegistrationPolicy(TestHiveRegistrationPolicyForIceberg.class.getName())
.setRegistrationProperties(ImmutableMap.<String,
String>builder().put("hive.database.name", dbName).build())
.build();
state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
state.setProp("default.hive.registration.policy",
- TestHiveRegistrationPolicy.class.getName());
+ TestHiveRegistrationPolicyForIceberg.class.getName());
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(),
state);
((IcebergMetadataWriter)
gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
HiveMetastoreTest.catalog);
@@ -150,7 +144,7 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest
{
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
}
- @Test
+ @Test ( priority = 1 )
public void testWriteAddFileGMCE() throws IOException {
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
@@ -202,7 +196,8 @@ public class GobblinMCEWriterTest extends HiveMetastoreTest
{
Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
}
- @Test(dependsOnMethods = {"testWriteAddFileGMCE"})
+ //Make sure hive test execute later and close the metastore
+ @Test( priority = 2 )
public void testWriteRewriteFileGMCE() throws IOException {
gmce.setTopicPartitionOffsetsRange(null);
FileSystem fs = FileSystem.get(new Configuration());
@@ -250,20 +245,28 @@ public class GobblinMCEWriterTest extends
HiveMetastoreTest {
return path;
}
- public static class TestHiveRegistrationPolicy extends
HiveRegistrationPolicyBase {
+ public static class TestHiveRegistrationPolicyForIceberg extends
HiveRegistrationPolicyBase {
- public TestHiveRegistrationPolicy(State props) throws IOException {
+ public TestHiveRegistrationPolicyForIceberg(State props) throws
IOException {
super(props);
}
protected Optional<HivePartition> getPartition(Path path, HiveTable table)
throws IOException {
- return Optional.of(new
HivePartition.Builder().withPartitionValues(Lists.newArrayList("2020-03-17-03"))
- .withDbName("hivedb").withTableName("testTable").build());
+ String partitionValue = "";
+ if (path.toString().contains("hourly/2020/03/17/08")) {
+ partitionValue = "2020-03-17-08";
+ } else if (path.toString().contains("hourly/2020/03/17/09")) {
+ partitionValue = "2020-03-17-09";
+ } else if (path.toString().contains("daily/2020/03/17")) {
+ partitionValue = "2020-03-17-00";
+ }
+ return Optional.of(new
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
+ .withDbName("hivedb").withTableName("testIcebergTable").build());
}
protected Iterable<String> getDatabaseNames(Path path) {
return Lists.newArrayList("hivedb");
}
protected List<String> getTableNames(Optional<String> dbPrefix, Path path)
{
- return Lists.newArrayList("testTable");
+ return Lists.newArrayList("testIcebergTable");
}
}
}