This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d050d98  [HUDI-232] Implement sealing/unsealing for HoodieRecord class 
(#938)
d050d98 is described below

commit d050d980715c0d8649531a3ed5ca7a7472b29760
Author: leesf <[email protected]>
AuthorDate: Tue Oct 8 01:56:46 2019 +0800

    [HUDI-232] Implement sealing/unsealing for HoodieRecord class (#938)
---
 .../org/apache/hudi/index/InMemoryHashIndex.java   |  2 +
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  2 +
 .../org/apache/hudi/index/hbase/HBaseIndex.java    |  2 +
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  2 +
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  2 +
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  2 +
 .../apache/hudi/func/TestUpdateMapFunction.java    |  2 +
 .../apache/hudi/table/TestCopyOnWriteTable.java    |  4 ++
 .../org/apache/hudi/common/model/HoodieRecord.java | 23 +++++++
 .../apache/hudi/common/model/TestHoodieRecord.java | 74 ++++++++++++++++++++++
 .../hudi/common/util/SpillableMapTestUtils.java    |  2 +
 11 files changed, 117 insertions(+)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java 
b/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java
index 0ef81cf..fdf2cbf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java
@@ -132,7 +132,9 @@ public class InMemoryHashIndex<T extends 
HoodieRecordPayload> extends HoodieInde
       while (hoodieRecordIterator.hasNext()) {
         HoodieRecord<T> rec = hoodieRecordIterator.next();
         if (recordLocationMap.containsKey(rec.getKey())) {
+          rec.unseal();
           rec.setCurrentLocation(recordLocationMap.get(rec.getKey()));
+          rec.seal();
         }
         taggedRecords.add(rec);
       }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java 
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 133f1e4..75016c6 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -371,7 +371,9 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
       // currentLocation 2 times and it will fail the second time. So creating 
a new in memory
       // copy of the hoodie record.
       record = new HoodieRecord<>(inputRecord);
+      record.unseal();
       record.setCurrentLocation(location.get());
+      record.seal();
     }
     return record;
   }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java 
b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 8399c4f..111d231 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -239,7 +239,9 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
                       currentRecord = new HoodieRecord(
                           new HoodieKey(currentRecord.getRecordKey(), 
partitionPath),
                           currentRecord.getData());
+                      currentRecord.unseal();
                       currentRecord.setCurrentLocation(new 
HoodieRecordLocation(commitTs, fileId));
+                      currentRecord.seal();
                       taggedRecords.add(currentRecord);
                       // the key from Result and the key being processed 
should be same
                       assert 
(currentRecord.getRecordKey().contentEquals(keyFromResult));
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 34178e6..0aa4137 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -293,7 +293,9 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
 
   private void writeToBuffer(HoodieRecord<T> record) {
     // update the new location of the record, so we know where to find it next
+    record.unseal();
     record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+    record.seal();
     Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
     if (indexedRecord.isPresent()) {
       recordList.add(indexedRecord.get());
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 4cb935f..afb8f85 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -101,7 +101,9 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieWri
         IndexedRecord recordWithMetadataInSchema = 
rewriteRecord((GenericRecord) avroRecord.get());
         storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, 
record);
         // update the new location of record, so we know where to find it next
+        record.unseal();
         record.setNewLocation(new HoodieRecordLocation(instantTime, 
writeStatus.getFileId()));
+        record.seal();
         recordsWritten++;
         insertRecordsWritten++;
       } else {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index e1926d0..a819cf7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -208,7 +208,9 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieWrit
       HoodieRecord<T> record = newRecordsItr.next();
       partitionPath = record.getPartitionPath();
       // update the new location of the record, so we know where to find it 
next
+      record.unseal();
       record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+      record.seal();
       //NOTE: Once Records are added to map (spillable-map), DO NOT change it 
as they won't persist
       keyToNewRecords.put(record.getRecordKey(), record);
     }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java 
b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java
index 5837628..8bd29b1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java
@@ -113,7 +113,9 @@ public class TestUpdateMapFunction extends 
HoodieClientTestHarness {
       TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
       HoodieRecord record1 = new HoodieRecord(new 
HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
           rowChange1);
+      record1.unseal();
       record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
+      record1.seal();
       updateRecords.add(record1);
 
       try {
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 6c0c0ca..12ef633 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -192,7 +192,9 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
     TestRawTripPayload updateRowChanges1 = new 
TestRawTripPayload(updateRecordStr1);
     HoodieRecord updatedRecord1 = new HoodieRecord(
         new HoodieKey(updateRowChanges1.getRowKey(), 
updateRowChanges1.getPartitionPath()), updateRowChanges1);
+    updatedRecord1.unseal();
     updatedRecord1.setCurrentLocation(new HoodieRecordLocation(null, 
FSUtils.getFileId(parquetFile.getName())));
+    updatedRecord1.seal();
 
     TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
     HoodieRecord insertedRecord1 = new HoodieRecord(
@@ -407,7 +409,9 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
     List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", 
numInserts);
     List<HoodieRecord> updateRecords = dataGenerator.generateUpdates("001", 
numUpdates);
     for (HoodieRecord updateRec : updateRecords) {
+      updateRec.unseal();
       updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
+      updateRec.seal();
     }
     List<HoodieRecord> records = new ArrayList<>();
     records.addAll(insertRecords);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index c03f175..a5043de 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -63,17 +63,24 @@ public class HoodieRecord<T extends HoodieRecordPayload> 
implements Serializable
    */
   private HoodieRecordLocation newLocation;
 
+  /**
+   * Indicates whether the object is sealed.
+   */
+  private boolean sealed;
+
   public HoodieRecord(HoodieKey key, T data) {
     this.key = key;
     this.data = data;
     this.currentLocation = null;
     this.newLocation = null;
+    this.sealed = false;
   }
 
   public HoodieRecord(HoodieRecord<T> record) {
     this(record.key, record.data);
     this.currentLocation = record.currentLocation;
     this.newLocation = record.newLocation;
+    this.sealed = record.sealed;
   }
 
   public HoodieKey getKey() {
@@ -100,6 +107,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> 
implements Serializable
    * Sets the current currentLocation of the record. This should happen 
exactly-once
    */
   public HoodieRecord setCurrentLocation(HoodieRecordLocation location) {
+    checkState();
     assert currentLocation == null;
     this.currentLocation = location;
     return this;
@@ -114,6 +122,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> 
implements Serializable
    * exactly-once.
    */
   public HoodieRecord setNewLocation(HoodieRecordLocation location) {
+    checkState();
     assert newLocation == null;
     this.newLocation = location;
     return this;
@@ -170,4 +179,18 @@ public class HoodieRecord<T extends HoodieRecordPayload> 
implements Serializable
     assert key != null;
     return key.getRecordKey();
   }
+
+  public void seal() {
+    this.sealed = true;
+  }
+
+  public void unseal() {
+    this.sealed = false;
+  }
+
+  public void checkState() {
+    if (sealed) {
+      throw new UnsupportedOperationException("Not allowed to modify after 
sealed");
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
new file mode 100644
index 0000000..408fedc
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.junit.Assert.fail;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link HoodieRecord}.
+ */
+public class TestHoodieRecord {
+
+  private HoodieRecord hoodieRecord;
+
+  @Before
+  public void setUp() throws Exception {
+    final List<IndexedRecord> indexedRecords = 
SchemaTestUtil.generateHoodieTestRecords(0, 1);
+    final List<HoodieRecord> hoodieRecords = indexedRecords.stream()
+              .map(r -> new HoodieRecord(new 
HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+                      new AvroBinaryTestPayload(Option.of((GenericRecord) 
r)))).collect(Collectors.toList());
+    hoodieRecord = hoodieRecords.get(0);
+  }
+
+  @Test
+  public void testModificationAfterSeal() {
+    hoodieRecord.seal();
+    final HoodieRecordLocation location = new HoodieRecordLocation("100", "0");
+    try {
+      hoodieRecord.setCurrentLocation(location);
+      fail("should fail since modification after sealed is not allowed");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof UnsupportedOperationException);
+    }
+  }
+
+  @Test
+  public void testNormalModification() {
+    hoodieRecord.unseal();
+    final HoodieRecordLocation location = new HoodieRecordLocation("100", "0");
+    hoodieRecord.setCurrentLocation(location);
+    hoodieRecord.seal();
+
+    hoodieRecord.unseal();
+    hoodieRecord.setNewLocation(location);
+    hoodieRecord.seal();
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
index 62a36e8..c0b2d8f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
@@ -45,7 +45,9 @@ public class SpillableMapTestUtils {
           recordKeys.add(key);
           HoodieRecord record = new HoodieRecord<>(new HoodieKey(key, 
partitionPath),
               new HoodieAvroPayload(Option.of((GenericRecord) r)));
+          record.unseal();
           record.setCurrentLocation(new 
HoodieRecordLocation("DUMMY_COMMIT_TIME", "DUMMY_FILE_ID"));
+          record.seal();
           records.put(key, record);
         });
     return recordKeys;

Reply via email to