ZihanLi58 commented on a change in pull request #3419:
URL: https://github.com/apache/gobblin/pull/3419#discussion_r742452232
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope) throws I
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
watermark.getLwm().getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+ private void addException(Exception e, String tableString, String dbName,
String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, Queue<MetadataFlushException>> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+ Queue<MetadataFlushException> errors =
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+ if (!errors.isEmpty() && errors.peek().highWatermark ==
tableStatus.gmceLowWatermark) {
+ errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ MetadataFlushException metadataFlushException =
+ new MetadataFlushException(tableStatus.datasetName, dbName,
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+ errors.offer(metadataFlushException);
+ }
+ tableErrorMap.put(tableString, errors);
+ this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+ log.error(String.format("Meet exception when flush table %s",
tableString), e);
+ if (datasetErrorMap.size() > maxErrorDataset) {
+ //Fail the job if the error size exceeds some number
+ throw new IOException(String.format("Container fails to flush for more
than %s dataset", maxErrorDataset));
Review comment:
I print out the exception also we will emit GTE for those exception. But
will update the exception to contain the last exception we met
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope) throws I
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
watermark.getLwm().getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+ private void addException(Exception e, String tableString, String dbName,
String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, Queue<MetadataFlushException>> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+ Queue<MetadataFlushException> errors =
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+ if (!errors.isEmpty() && errors.peek().highWatermark ==
tableStatus.gmceLowWatermark) {
+ errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ MetadataFlushException metadataFlushException =
+ new MetadataFlushException(tableStatus.datasetName, dbName,
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+ errors.offer(metadataFlushException);
+ }
+ tableErrorMap.put(tableString, errors);
+ this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+ log.error(String.format("Meet exception when flush table %s",
tableString), e);
+ if (datasetErrorMap.size() > maxErrorDataset) {
+ //Fail the job if the error size exceeds some number
+ throw new IOException(String.format("Container fails to flush for more
than %s dataset", maxErrorDataset));
+ }
+ tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+ }
+
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void flush(String dbName, String tableName) throws IOException {
+ boolean meetException = false;
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ if (tableOperationTypeMap.get(tableString).gmceLowWatermark ==
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+ // No need to flush
+ return;
+ }
+ for (MetadataWriter writer : metadataWriters) {
+ if(meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.flush(dbName, tableName);
+ } catch (IOException e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ if (!meetException &&
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)
Review comment:
We want to emit GTE only when the actual watermark moves. it's either in
the flush() method where watermark in state store move or we successfully
commit one snapshot so that the high watermark in the iceberg table property
moves. That's why the condition here is meetException is false as this means we
successfully commit once for this table after we met exception. Another place
we will emit GTE is at the end of the flush() (not flush(db,table)) method.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope) throws I
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
watermark.getLwm().getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+ private void addException(Exception e, String tableString, String dbName,
String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, Queue<MetadataFlushException>> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+ Queue<MetadataFlushException> errors =
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+ if (!errors.isEmpty() && errors.peek().highWatermark ==
tableStatus.gmceLowWatermark) {
+ errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ MetadataFlushException metadataFlushException =
+ new MetadataFlushException(tableStatus.datasetName, dbName,
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+ errors.offer(metadataFlushException);
+ }
+ tableErrorMap.put(tableString, errors);
+ this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
Review comment:
Sorry for that confusing, I was using datasetName to be consistent with
the metadata schema where they treat path as datasetName. I will change
datasetName to datasetPath to avoid this confusion. It's not a 1:1 mapping, as
one dataset can be register into multi tables
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope) throws I
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
watermark.getLwm().getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+ private void addException(Exception e, String tableString, String dbName,
String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, Queue<MetadataFlushException>> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+ Queue<MetadataFlushException> errors =
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+ if (!errors.isEmpty() && errors.peek().highWatermark ==
tableStatus.gmceLowWatermark) {
+ errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ MetadataFlushException metadataFlushException =
+ new MetadataFlushException(tableStatus.datasetName, dbName,
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+ errors.offer(metadataFlushException);
+ }
+ tableErrorMap.put(tableString, errors);
+ this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+ log.error(String.format("Meet exception when flush table %s",
tableString), e);
+ if (datasetErrorMap.size() > maxErrorDataset) {
+ //Fail the job if the error size exceeds some number
+ throw new IOException(String.format("Container fails to flush for more
than %s dataset", maxErrorDataset));
Review comment:
I print out the exception also we will emit GTE for those exception. But
will update the exception to contain the last exception we met
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope) throws I
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
watermark.getLwm().getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+ private void addException(Exception e, String tableString, String dbName,
String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, Queue<MetadataFlushException>> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+ Queue<MetadataFlushException> errors =
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+ if (!errors.isEmpty() && errors.peek().highWatermark ==
tableStatus.gmceLowWatermark) {
+ errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ MetadataFlushException metadataFlushException =
+ new MetadataFlushException(tableStatus.datasetName, dbName,
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+ errors.offer(metadataFlushException);
+ }
+ tableErrorMap.put(tableString, errors);
+ this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+ log.error(String.format("Meet exception when flush table %s",
tableString), e);
+ if (datasetErrorMap.size() > maxErrorDataset) {
+ //Fail the job if the error size exceeds some number
+ throw new IOException(String.format("Container fails to flush for more
than %s dataset", maxErrorDataset));
+ }
+ tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+ }
+
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void flush(String dbName, String tableName) throws IOException {
+ boolean meetException = false;
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ if (tableOperationTypeMap.get(tableString).gmceLowWatermark ==
tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+ // No need to flush
+ return;
+ }
+ for (MetadataWriter writer : metadataWriters) {
+ if(meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.flush(dbName, tableName);
+ } catch (IOException e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ if (!meetException &&
datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)
Review comment:
We want to emit GTE only when the actual watermark moves. it's either in
the flush() method where watermark in state store move or we successfully
commit one snapshot so that the high watermark in the iceberg table property
moves. That's why the condition here is meetException is false as this means we
successfully commit once for this table after we met exception. Another place
we will emit GTE is at the end of the flush() (not flush(db,table)) method.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope) throws I
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
- if (tableOperationTypeMap.containsKey(tableString)
- && tableOperationTypeMap.get(tableString) !=
gmce.getOperationType()) {
- for (MetadataWriter writer : metadataWriters) {
- writer.flush(dbName, tableName);
- }
- }
- tableOperationTypeMap.put(tableString, gmce.getOperationType());
- for (MetadataWriter writer : metadataWriters) {
- writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ if (!tableOperationTypeMap.containsKey(tableString)) {
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+ } else if (tableOperationTypeMap.get(tableString).operationType !=
gmce.getOperationType()) {
+ flush(dbName, tableName);
+ tableOperationTypeMap.put(tableString, new
TableStatus(gmce.getOperationType(),
+ gmce.getDatasetIdentifier().getNativeName(),
watermark.getTopicPartition().getTopicName(),
watermark.getTopicPartition().getId(),
+ watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
}
+ tableOperationTypeMap.get(tableString).gmceHighWatermark =
watermark.getLwm().getValue();
+ write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
+ // Add fault tolerant ability and make sure we can emit GTE as desired
+ private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap
newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ boolean meetException = false;
+ String dbName = spec.getTable().getDbName();
+ String tableName = spec.getTable().getTableName();
+ String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName,
tableName);
+ for (MetadataWriter writer : metadataWriters) {
+ if (meetException) {
+ writer.reset(dbName, tableName);
+ } else {
+ try {
+ writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+ } catch (Exception e) {
+ meetException = true;
+ addException(e, tableString, dbName, tableName);
+ }
+ }
+ }
+ }
+ private void addException(Exception e, String tableString, String dbName,
String tableName) throws IOException{
+ TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+ Map<String, Queue<MetadataFlushException>> tableErrorMap =
this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+ Queue<MetadataFlushException> errors =
tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+ if (!errors.isEmpty() && errors.peek().highWatermark ==
tableStatus.gmceLowWatermark) {
+ errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ MetadataFlushException metadataFlushException =
+ new MetadataFlushException(tableStatus.datasetName, dbName,
tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+ errors.offer(metadataFlushException);
+ }
+ tableErrorMap.put(tableString, errors);
+ this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
Review comment:
Sorry for that confusing, I was using datasetName to be consistent with
the metadata schema where they treat path as datasetName. I will change
datasetName to datasetPath to avoid this confusion. It's not a 1:1 mapping, as
one dataset can be register into multi tables
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]