This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 62a4344 [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one
topic failure will not affect other topics in the same container (#3419)
62a4344 is described below
commit 62a434474d96cdf63bd4913846f6ca07f8c9e5e9
Author: Zihan Li <[email protected]>
AuthorDate: Tue Nov 9 17:24:28 2021 -0800
[GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will
not affect other topics in the same container (#3419)
* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure
will not affect other topics in the same container
* address comments
* change the way we set low watermark to have a better indicate for the
watermark range of the snapshot
* address comments
* fix test error
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 9 ++
.../apache/gobblin/hive/writer/MetadataWriter.java | 10 ++
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 131 +++++++++++++++++----
.../iceberg/writer/GobblinMetadataException.java | 40 +++++++
.../iceberg/writer/IcebergMetadataWriter.java | 19 +--
.../iceberg/writer/HiveMetadataWriterTest.java | 5 +-
.../iceberg/writer/IcebergMetadataWriterTest.java | 39 +++++-
7 files changed, 218 insertions(+), 35 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 9ca6bb9..5ed0d47 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -123,6 +123,15 @@ public class HiveMetadataWriter implements MetadataWriter {
}
}
+ @Override
+ public void reset(String dbName, String tableName) throws IOException {
+ String tableKey = tableNameJoiner.join(dbName, tableName);
+ this.currentExecutionMap.remove(tableKey);
+ this.schemaCreationTimeMap.remove(tableKey);
+ this.latestSchemaMap.remove(tableKey);
+ this.specMaps.remove(tableKey);
+ }
+
public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
String dbName = tableSpec.getTable().getDbName();
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index 6333688..e9457d1 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -48,6 +48,16 @@ public interface MetadataWriter extends Closeable {
void flush(String dbName, String tableName) throws IOException;
/**
+ * If something wrong happens, we want to clean up in-memory state for the
table inside the writer so that we can continue
+ * registration for this table without affect correctness
+ *
+ * @param dbName The db name of metadata-registration target.
+ * @param tableName The table name of metadata-registration target.
+ * @throws IOException
+ */
+ void reset(String dbName, String tableName) throws IOException;
+
+ /**
* Compute and cache the metadata from the GMCE
* @param recordEnvelope Containing {@link GobblinMetadataChangeEvent}
* @param newSpecsMap The container (as a map) for new specs.
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index a6036e3..324ca85 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -29,9 +29,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.AllArgsConstructor;
+import lombok.Setter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -87,12 +91,15 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS =
"metadata.parallel.runner.timeout.mills";
public static final String HIVE_PARTITION_NAME = "hive.partition.name";
public static final String GMCE_METADATA_WRITER_CLASSES =
"gmce.metadata.writer.classes";
+ public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET =
"gmce.metadata.writer.max.error.dataset";
+ public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
public static final String TABLE_NAME_DELIMITER = ".";
@Getter
List<MetadataWriter> metadataWriters;
- Map<String, OperationType> tableOperationTypeMap;
- Map<String, OperationType> datasetOperationTypeMap;
+ Map<String, TableStatus> tableOperationTypeMap;
+ @Getter
+ Map<String, Map<String, GobblinMetadataException>> datasetErrorMap;
Set<String> acceptedClusters;
protected State state;
private final ParallelRunner parallelRunner;
@@ -101,18 +108,30 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
private Map<String, Cache<String, Collection<HiveSpec>>> newSpecsMaps;
private Closer closer = Closer.create();
protected final AtomicLong recordCount = new AtomicLong(0L);
+ @Setter
+ private int maxErrorDataset;
+
+ @AllArgsConstructor
+ static class TableStatus {
+ OperationType operationType;
+ String datasetPath;
+ String gmceTopicPartition;
+ long gmceLowWatermark;
+ long gmceHighWatermark;
+ }
GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State
properties) throws IOException {
newSpecsMaps = new HashMap<>();
oldSpecsMaps = new HashMap<>();
metadataWriters = new ArrayList<>();
+ datasetErrorMap = new HashMap<>();
acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES,
ClustersNames.getInstance().getClusterName());
state = properties;
+ maxErrorDataset =
state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET,
DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES,
IcebergMetadataWriter.class.getName())) {
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class,
className, state)));
}
tableOperationTypeMap = new HashMap<>();
- datasetOperationTypeMap = new HashMap<>();
parallelRunner = closer.register(new
ParallelRunner(state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20),
FileSystem.get(HadoopUtils.getConfFromState(properties))));
parallelRunnerTimeoutMills =
@@ -187,6 +206,8 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
@Override
public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope)
throws IOException {
GenericRecord genericRecord = recordEnvelope.getRecord();
+ CheckpointableWatermark watermark = recordEnvelope.getWatermark();
+ Preconditions.checkNotNull(watermark);
//filter out the events that not emitted by accepted clusters
if (!acceptedClusters.contains(genericRecord.get("cluster"))) {
return;
@@ -197,13 +218,7 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
String datasetName = gmce.getDatasetIdentifier().toString();
//remove the old hive spec cache after flush
//Here we assume that new hive spec for one path always be the
same(ingestion flow register to same tables)
- if (!datasetOperationTypeMap.containsKey(datasetName)) {
- oldSpecsMaps.remove(datasetName);
- }
- if (datasetOperationTypeMap.containsKey(datasetName)
- && datasetOperationTypeMap.get(datasetName) !=
gmce.getOperationType()) {
- datasetOperationTypeMap.put(datasetName, gmce.getOperationType());
- }
+ oldSpecsMaps.remove(datasetName);
// Mapping from URI of path of arrival files to the list of HiveSpec
objects.
// We assume in one same operation interval, for same dataset, the table
property will not change to reduce the time to compute hiveSpec.
@@ -248,20 +263,93 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
+ ((LongWatermark)watermark.getWatermark()).getValue()-1,
((LongWatermark)watermark.getWatermark()).getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
+ ((LongWatermark)watermark.getWatermark()).getValue()-1,
((LongWatermark)watermark.getWatermark()).getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
((LongWatermark)watermark.getWatermark()).getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ writer.reset(dbName, tableName);
+ addOrThrowException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+
+ private void addOrThrowException(Exception e, String tableString, String
dbName, String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, GobblinMetadataException> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
+ if (tableErrorMap.containsKey(tableString)) {
+ tableErrorMap.get(tableString).highWatermark =
tableStatus.gmceHighWatermark;
+ } else {
+ GobblinMetadataException gobblinMetadataException =
+ new GobblinMetadataException(tableStatus.datasetPath, dbName,
tableName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark,
tableStatus.gmceHighWatermark, e);
+ tableErrorMap.put(tableString, gobblinMetadataException);
+ }
+ this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
+ log.error(String.format("Meet exception when flush table %s",
tableString), e);
+ if (datasetErrorMap.size() > maxErrorDataset) {
+ //Fail the job if the error size exceeds some number
+ throw new IOException(String.format("Container fails to flush for more
than %s dataset, last exception we met is: ", maxErrorDataset), e);
+ }
+ tableOperationTypeMap.remove(tableString);
+ }
+
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void flush(String dbName, String tableName) throws IOException {
+ boolean meetException = false;
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ if (tableOperationTypeMap.get(tableString).gmceLowWatermark ==
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+ // No need to flush
+ return;
+ }
+ for (MetadataWriter writer : metadataWriters) {
+ if(meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.flush(dbName, tableName);
+ } catch (IOException e) {
+ meetException = true;
+ writer.reset(dbName, tableName);
+ addOrThrowException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ if (!meetException &&
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetPath)
+ &&
datasetErrorMap.get(tableOperationTypeMap.get(tableString).datasetPath).containsKey(tableString))
{
+ // We only want to emit GTE when the table watermark moves. There can be
two scenario that watermark move, one is after one flush interval,
+ // we commit new watermark to state store, anther is here, where during
the flush interval, we flush table because table operation changes.
+ // Under this condition, error map contains this dataset means we met
error before this flush, but this time when flush succeed and
+ // the watermark inside the table moves, so we want to emit GTE to
indicate there is some data loss here
+ //todo: since we finish flush for this table once, need to emit GTE to
indicate we miss data for this table
+ log.warn(String.format("Send GTE to indicate table flush failure for
%s", tableString));
+ }
+ }
/**
* Call the metadata writers to do flush each table metadata.
* Flush of metadata writer is the place that do real metadata
@@ -279,12 +367,11 @@ public class GobblinMCEWriter implements
DataWriter<GenericRecord> {
log.info(String.format("start to flushing %s records",
String.valueOf(recordCount.get())));
for (String tableString : tableOperationTypeMap.keySet()) {
List<String> tid =
Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(tid.get(0), tid.get(1));
- }
+ flush(tid.get(0), tid.get(1));
}
tableOperationTypeMap.clear();
recordCount.lazySet(0L);
+ //todo: after flush, watermark will move forward, so emit error GTE here
}
@Override
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
new file mode 100644
index 0000000..90f1e63
--- /dev/null
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import java.io.IOException;
+
+
+public class GobblinMetadataException extends IOException {
+ public String datasetPath;
+ public String dbName;
+ public String tableName;
+ public String GMCETopicPartition;
+ public long highWatermark;
+ public long lowWatermark;
+ public Exception exception;
+ GobblinMetadataException(String datasetPath, String dbName, String
tableName, String GMCETopicPartition, long lowWatermark, long highWatermark,
Exception exception) {
+ super(String.format("failed to flush table %s, %s", dbName, tableName),
exception);
+ this.datasetPath = datasetPath;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.GMCETopicPartition = GMCETopicPartition;
+ this.highWatermark = highWatermark;
+ this.lowWatermark = lowWatermark;
+ }
+}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 28051a8..db01793 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -46,6 +46,7 @@ import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -114,7 +115,6 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
-import
org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
@@ -267,10 +267,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
currentWatermark =
icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY,
topicPartition)) ? Long.parseLong(
icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY,
topicPartition))) : DEFAULT_WATERMARK;
- if (currentWatermark != DEFAULT_WATERMARK) {
- // set the low watermark for current snapshot
- tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark = Optional.of(currentWatermark);
- }
return currentWatermark;
}
@@ -861,6 +857,11 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
}
+ @Override
+ public void reset(String dbName, String tableName) throws IOException {
+ this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+ }
+
/**
* NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit
counts match
* for that window (aka its is set to the beginning of next window)
@@ -1013,13 +1014,13 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
TableIdentifier tid =
TableIdentifier.of(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName());
String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
- t -> ((KafkaWatermark)
recordEnvelope.getWatermark()).getTopicPartition().toString());
+ t -> recordEnvelope.getWatermark().getSource());
Long currentWatermark = getAndPersistCurrentWatermark(tid,
topicPartition);
- Long currentOffset = ((KafkaWatermark)
recordEnvelope.getWatermark()).getLwm().getValue();
+ Long currentOffset =
((LongWatermark)recordEnvelope.getWatermark().getWatermark()).getValue();
if (currentOffset > currentWatermark) {
- if (currentWatermark == DEFAULT_WATERMARK) {
- //This means we haven't register this table or the GMCE topic
partition changed, we need to reset the low watermark
+ if (!tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark.isPresent()) {
+ //This means we haven't register this table or met some error
before, we need to reset the low watermark
tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark =
Optional.of(currentOffset - 1);
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 70a3a2b..d6906d3 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -177,8 +177,7 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
state.setProp("gmce.metadata.writer.classes",
"org.apache.gobblin.hive.writer.HiveMetadataWriter");
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(),
state);
}
-
- @Test(dependsOnGroups={"icebergMetadataWriterTest"})
+ @Test
public void testHiveWriteAddFileGMCE() throws IOException {
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
@@ -215,7 +214,7 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
}
- @Test(dependsOnMethods = {"testHiveWriteAddFileGMCE"})
+ @Test(dependsOnMethods = {"testHiveWriteAddFileGMCE"},
groups={"hiveMetadataWriterTest"})
public void testHiveWriteRewriteFileGMCE() throws IOException {
gmce.setTopicPartitionOffsetsRange(null);
Map<String, String> registrationState = gmce.getRegistrationProperties();
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 8aba5e7..a1d9b18 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -188,7 +188,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
return state;
}
- @Test
+ @Test(dependsOnGroups={"hiveMetadataWriterTest"})
public void testWriteAddFileGMCE() throws IOException {
// Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelop method
// without risking running into type cast runtime error.
@@ -325,6 +325,40 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"45");
}
+ @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"},
groups={"icebergMetadataWriterTest"})
+ public void testFaultTolerant() throws Exception {
+ // Set fault tolerant dataset number to be 1
+ gobblinMCEWriter.setMaxErrorDataset(1);
+ // Stop metaStore so write will fail
+ stopMetastore();
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(),
gmce);
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(51L))));
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(52L))));
+ Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
+ Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
+ .get(new File(tmpDir,
"data/tracking/testIcebergTable").getAbsolutePath())
+ .get("hivedb.testIcebergTable").lowWatermark, 50L);
+ Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
+ .get(new File(tmpDir,
"data/tracking/testIcebergTable").getAbsolutePath())
+ .get("hivedb.testIcebergTable").highWatermark, 52L);
+ //We should not see exception as we have fault tolerant
+ gobblinMCEWriter.flush();
+
gmce.getDatasetIdentifier().setNativeName("data/tracking/testFaultTolerant");
+ GenericRecord genericGmce_differentDb =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ Assert.expectThrows(IOException.class, () ->
gobblinMCEWriter.writeEnvelope((new RecordEnvelope<>(genericGmce_differentDb,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(53L))))));
+ // restart metastore to make sure followiing test runs well
+ startMetastore();
+ }
+
@Test(dependsOnMethods={"testChangeProperty"},
groups={"icebergMetadataWriterTest"})
public void testWriteAddFileGMCECompleteness() throws IOException {
// Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelop method
@@ -466,6 +500,9 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
return Lists.newArrayList("hivedb");
}
protected List<String> getTableNames(Optional<String> dbPrefix, Path path)
{
+ if (path.toString().contains("testFaultTolerant")) {
+ return Lists.newArrayList("testFaultTolerantIcebergTable");
+ }
return Lists.newArrayList("testIcebergTable");
}
}