This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 90d0a0e  Account for minor clock skew between concurrent commits 
(#1110)
90d0a0e is described below

commit 90d0a0e59fc146534b115f56dfe8d6d63156a198
Author: Ratandeep Ratti <[email protected]>
AuthorDate: Mon Jun 15 15:33:10 2020 -0700

    Account for minor clock skew between concurrent commits (#1110)
---
 .../java/org/apache/iceberg/TableMetadata.java     | 26 +++++++++++++--
 .../org/apache/iceberg/TableMetadataParser.java    | 12 ++-----
 .../java/org/apache/iceberg/TestTableMetadata.java | 37 ----------------------
 3 files changed, 27 insertions(+), 48 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index e493a29..8501aa9 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -49,6 +50,8 @@ public class TableMetadata implements Serializable {
   static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
   static final int INITIAL_SPEC_ID = 0;
 
+  private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1);
+
   /**
    * @deprecated will be removed in 0.9.0; use newTableMetadata(Schema, 
PartitionSpec, String, Map) instead.
    */
@@ -253,21 +256,40 @@ public class TableMetadata implements Serializable {
     for (HistoryEntry logEntry : snapshotLog) {
       if (last != null) {
         Preconditions.checkArgument(
-            (logEntry.timestampMillis() - last.timestampMillis()) >= 0,
+            (logEntry.timestampMillis() - last.timestampMillis()) >= 
-ONE_MINUTE,
             "[BUG] Expected sorted snapshot log entries.");
       }
       last = logEntry;
     }
+    if (last != null) {
+      Preconditions.checkArgument(
+          // commits can happen concurrently from different machines.
+          // A tolerance helps us avoid failure for small clock skew
+          lastUpdatedMillis - last.timestampMillis() >= -ONE_MINUTE,
+          "Invalid update timestamp %s: before last snapshot log entry at %s",
+          lastUpdatedMillis, last.timestampMillis());
+    }
 
     MetadataLogEntry previous = null;
     for (MetadataLogEntry metadataEntry : previousFiles) {
       if (previous != null) {
         Preconditions.checkArgument(
-            (metadataEntry.timestampMillis() - previous.timestampMillis()) >= 
0,
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            (metadataEntry.timestampMillis() - previous.timestampMillis()) >= 
-ONE_MINUTE,
             "[BUG] Expected sorted previous metadata log entries.");
       }
       previous = metadataEntry;
     }
+      // Make sure that this update's lastUpdatedMillis is > 
max(previousFile's timestamp)
+    if (previous != null) {
+      Preconditions.checkArgument(
+          // commits can happen concurrently from different machines.
+          // A tolerance helps us avoid failure for small clock skew
+          lastUpdatedMillis - previous.timestampMillis >= -ONE_MINUTE,
+          "Invalid update timestamp %s: before the latest metadata log entry 
timestamp %s",
+          lastUpdatedMillis, previous.timestampMillis);
+    }
 
     Preconditions.checkArgument(
         currentSnapshotId < 0 || snapshotsById.containsKey(currentSnapshotId),
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java 
b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 3f89aa6..bf61332 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -27,12 +27,10 @@ import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.SortedSet;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 import org.apache.iceberg.TableMetadata.MetadataLogEntry;
@@ -44,7 +42,6 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.JsonUtil;
 
 public class TableMetadataParser {
@@ -293,8 +290,7 @@ public class TableMetadataParser {
       snapshots.add(SnapshotParser.fromJson(io, iterator.next()));
     }
 
-    SortedSet<SnapshotLogEntry> entries =
-        
Sets.newTreeSet(Comparator.comparingLong(SnapshotLogEntry::timestampMillis));
+    ImmutableList.Builder<HistoryEntry> entries = ImmutableList.builder();
     if (node.has(SNAPSHOT_LOG)) {
       Iterator<JsonNode> logIterator = node.get(SNAPSHOT_LOG).elements();
       while (logIterator.hasNext()) {
@@ -304,8 +300,7 @@ public class TableMetadataParser {
       }
     }
 
-    SortedSet<MetadataLogEntry> metadataEntries =
-            
Sets.newTreeSet(Comparator.comparingLong(MetadataLogEntry::timestampMillis));
+    ImmutableList.Builder<MetadataLogEntry> metadataEntries = 
ImmutableList.builder();
     if (node.has(METADATA_LOG)) {
       Iterator<JsonNode> logIterator = node.get(METADATA_LOG).elements();
       while (logIterator.hasNext()) {
@@ -317,7 +312,6 @@ public class TableMetadataParser {
 
     return new TableMetadata(file, formatVersion, uuid, location,
         lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema, 
defaultSpecId, specs, properties,
-        currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()),
-        ImmutableList.copyOf(metadataEntries.iterator()));
+        currentVersionId, snapshots, entries.build(), metadataEntries.build());
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java 
b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index 205a79e..72b2491 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -138,43 +138,6 @@ public class TestTableMetadata {
   }
 
   @Test
-  public void testFromJsonSortsSnapshotLog() throws Exception {
-    long previousSnapshotId = System.currentTimeMillis() - new 
Random(1234).nextInt(3600);
-    Snapshot previousSnapshot = new BaseSnapshot(
-        ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, 
ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), 
SPEC_5.specId())));
-    long currentSnapshotId = System.currentTimeMillis();
-    Snapshot currentSnapshot = new BaseSnapshot(
-        ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, 
null, null, ImmutableList.of(
-        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), 
SPEC_5.specId())));
-
-    List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
-
-    TableMetadata expected = new TableMetadata(null, 1, 
UUID.randomUUID().toString(), TEST_LOCATION,
-        0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, 
ImmutableList.of(SPEC_5),
-        ImmutableMap.of("property", "value"), currentSnapshotId,
-        Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, 
ImmutableList.of());
-
-    // add the entries after creating TableMetadata to avoid the sorted check
-    reversedSnapshotLog.add(
-        new SnapshotLogEntry(currentSnapshot.timestampMillis(), 
currentSnapshot.snapshotId()));
-    reversedSnapshotLog.add(
-        new SnapshotLogEntry(previousSnapshot.timestampMillis(), 
previousSnapshot.snapshotId()));
-
-    String asJson = TableMetadataParser.toJson(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), null,
-        JsonUtil.mapper().readValue(asJson, JsonNode.class));
-
-    List<SnapshotLogEntry> expectedSnapshotLog = 
ImmutableList.<SnapshotLogEntry>builder()
-        .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), 
previousSnapshot.snapshotId()))
-        .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), 
currentSnapshot.snapshotId()))
-        .build();
-
-    Assert.assertEquals("Snapshot logs should match",
-        expectedSnapshotLog, metadata.snapshotLog());
-  }
-
-  @Test
   public void testBackwardCompat() throws Exception {
     PartitionSpec spec = 
PartitionSpec.builderFor(TEST_SCHEMA).identity("x").withSpecId(6).build();
 

Reply via email to