This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 90d0a0e Account for minor clock skew between concurrent commits
(#1110)
90d0a0e is described below
commit 90d0a0e59fc146534b115f56dfe8d6d63156a198
Author: Ratandeep Ratti <[email protected]>
AuthorDate: Mon Jun 15 15:33:10 2020 -0700
Account for minor clock skew between concurrent commits (#1110)
---
.../java/org/apache/iceberg/TableMetadata.java | 26 +++++++++++++--
.../org/apache/iceberg/TableMetadataParser.java | 12 ++-----
.../java/org/apache/iceberg/TestTableMetadata.java | 37 ----------------------
3 files changed, 27 insertions(+), 48 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index e493a29..8501aa9 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.ValidationException;
@@ -49,6 +50,8 @@ public class TableMetadata implements Serializable {
static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
static final int INITIAL_SPEC_ID = 0;
+ private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1);
+
/**
* @deprecated will be removed in 0.9.0; use newTableMetadata(Schema,
PartitionSpec, String, Map) instead.
*/
@@ -253,21 +256,40 @@ public class TableMetadata implements Serializable {
for (HistoryEntry logEntry : snapshotLog) {
if (last != null) {
Preconditions.checkArgument(
- (logEntry.timestampMillis() - last.timestampMillis()) >= 0,
+ (logEntry.timestampMillis() - last.timestampMillis()) >=
-ONE_MINUTE,
"[BUG] Expected sorted snapshot log entries.");
}
last = logEntry;
}
+ if (last != null) {
+ Preconditions.checkArgument(
+ // commits can happen concurrently from different machines.
+ // A tolerance helps us avoid failure for small clock skew
+ lastUpdatedMillis - last.timestampMillis() >= -ONE_MINUTE,
+ "Invalid update timestamp %s: before last snapshot log entry at %s",
+ lastUpdatedMillis, last.timestampMillis());
+ }
MetadataLogEntry previous = null;
for (MetadataLogEntry metadataEntry : previousFiles) {
if (previous != null) {
Preconditions.checkArgument(
- (metadataEntry.timestampMillis() - previous.timestampMillis()) >=
0,
+ // commits can happen concurrently from different machines.
+ // A tolerance helps us avoid failure for small clock skew
+ (metadataEntry.timestampMillis() - previous.timestampMillis()) >=
-ONE_MINUTE,
"[BUG] Expected sorted previous metadata log entries.");
}
previous = metadataEntry;
}
+ // Make sure that this update's lastUpdatedMillis is >
max(previousFile's timestamp)
+ if (previous != null) {
+ Preconditions.checkArgument(
+ // commits can happen concurrently from different machines.
+ // A tolerance helps us avoid failure for small clock skew
+ lastUpdatedMillis - previous.timestampMillis >= -ONE_MINUTE,
+ "Invalid update timestamp %s: before the latest metadata log entry
timestamp %s",
+ lastUpdatedMillis, previous.timestampMillis);
+ }
Preconditions.checkArgument(
currentSnapshotId < 0 || snapshotsById.containsKey(currentSnapshotId),
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 3f89aa6..bf61332 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -27,12 +27,10 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.SortedSet;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
@@ -44,7 +42,6 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.JsonUtil;
public class TableMetadataParser {
@@ -293,8 +290,7 @@ public class TableMetadataParser {
snapshots.add(SnapshotParser.fromJson(io, iterator.next()));
}
- SortedSet<SnapshotLogEntry> entries =
-
Sets.newTreeSet(Comparator.comparingLong(SnapshotLogEntry::timestampMillis));
+ ImmutableList.Builder<HistoryEntry> entries = ImmutableList.builder();
if (node.has(SNAPSHOT_LOG)) {
Iterator<JsonNode> logIterator = node.get(SNAPSHOT_LOG).elements();
while (logIterator.hasNext()) {
@@ -304,8 +300,7 @@ public class TableMetadataParser {
}
}
- SortedSet<MetadataLogEntry> metadataEntries =
-
Sets.newTreeSet(Comparator.comparingLong(MetadataLogEntry::timestampMillis));
+ ImmutableList.Builder<MetadataLogEntry> metadataEntries =
ImmutableList.builder();
if (node.has(METADATA_LOG)) {
Iterator<JsonNode> logIterator = node.get(METADATA_LOG).elements();
while (logIterator.hasNext()) {
@@ -317,7 +312,6 @@ public class TableMetadataParser {
return new TableMetadata(file, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema,
defaultSpecId, specs, properties,
- currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()),
- ImmutableList.copyOf(metadataEntries.iterator()));
+ currentVersionId, snapshots, entries.build(), metadataEntries.build());
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index 205a79e..72b2491 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -138,43 +138,6 @@ public class TestTableMetadata {
}
@Test
- public void testFromJsonSortsSnapshotLog() throws Exception {
- long previousSnapshotId = System.currentTimeMillis() - new
Random(1234).nextInt(3600);
- Snapshot previousSnapshot = new BaseSnapshot(
- ops.io(), previousSnapshotId, null, previousSnapshotId, null, null,
ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"),
SPEC_5.specId())));
- long currentSnapshotId = System.currentTimeMillis();
- Snapshot currentSnapshot = new BaseSnapshot(
- ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId,
null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"),
SPEC_5.specId())));
-
- List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
-
- TableMetadata expected = new TableMetadata(null, 1,
UUID.randomUUID().toString(), TEST_LOCATION,
- 0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5,
ImmutableList.of(SPEC_5),
- ImmutableMap.of("property", "value"), currentSnapshotId,
- Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.of());
-
- // add the entries after creating TableMetadata to avoid the sorted check
- reversedSnapshotLog.add(
- new SnapshotLogEntry(currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()));
- reversedSnapshotLog.add(
- new SnapshotLogEntry(previousSnapshot.timestampMillis(),
previousSnapshot.snapshotId()));
-
- String asJson = TableMetadataParser.toJson(expected);
- TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), null,
- JsonUtil.mapper().readValue(asJson, JsonNode.class));
-
- List<SnapshotLogEntry> expectedSnapshotLog =
ImmutableList.<SnapshotLogEntry>builder()
- .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(),
previousSnapshot.snapshotId()))
- .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()))
- .build();
-
- Assert.assertEquals("Snapshot logs should match",
- expectedSnapshotLog, metadata.snapshotLog());
- }
-
- @Test
public void testBackwardCompat() throws Exception {
PartitionSpec spec =
PartitionSpec.builderFor(TEST_SCHEMA).identity("x").withSpecId(6).build();