This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 736829cdda add LazyRow abstraction for previously indexed record 
(#11826)
736829cdda is described below

commit 736829cdda01435be80a8b4286a3f8128a1ec18e
Author: rohit <[email protected]>
AuthorDate: Fri Oct 27 02:01:23 2023 +0530

    add LazyRow abstraction for previously indexed record (#11826)
---
 .../segment/local/segment/readers/LazyRow.java     | 102 ++++++++++++++++++
 ...oncurrentMapPartitionUpsertMetadataManager.java |   7 +-
 .../segment/local/upsert/PartialUpsertHandler.java |  55 ++++------
 .../segment/local/segment/readers/LazyRowTest.java | 119 +++++++++++++++++++++
 .../local/upsert/PartialUpsertHandlerTest.java     |   5 +-
 5 files changed, 252 insertions(+), 36 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java
new file mode 100644
index 0000000000..4f05ae96f6
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * <p>A wrapper class to read column values of a row for a given {@link 
IndexSegment} and docId.<br>
+ * The advantage of having wrapper over segment and docId is column values are 
read only when
+ * {@link LazyRow#getValue(String)} is invoked.
+ * This is useful to reduce the disk reads incurred due to loading the 
complete previous row during merge step.
+ *
+ * <p>The LazyRow has an internal state and should not be used concurrently. 
To reuse the LazyRow, create an instance
+ * using no arg constructor and re-initialise using {@link 
LazyRow#init(IndexSegment, int)}
+ */
+public class LazyRow {
+  private final Map<String, Object> _fieldToValueMap = new HashMap<>();
+  private final Set<String> _nullValueFields = new HashSet<>();
+  private IndexSegment _segment;
+  private int _docId;
+
+  public LazyRow() {
+  }
+
+  public void init(IndexSegment segment, int docId) {
+    clear();
+    _segment = segment;
+    _docId = docId;
+  }
+
+  /**
+   * Computes a field's value in an indexed row.
+   * @param fieldName
+   * @return Returns value or null for persisted null values
+   */
+  @Nullable
+  public Object getValue(String fieldName) {
+
+    // if field's value was previously read as null, return null
+    if (_nullValueFields.contains(fieldName)) {
+      return null;
+    }
+    if (_segment == null) {
+      throw new IllegalStateException("Index segment for Lazy row is 
uninitialized.");
+    }
+
+    // compute the _fieldToValueMap or _nullValueFields based on the indexed 
value
+    return _fieldToValueMap.computeIfAbsent(fieldName, col -> {
+      Object value = null;
+      try (PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(_segment, col)) {
+        if (!columnReader.isNull(_docId)) {
+          value = columnReader.getValue(_docId);
+        } else {
+          _nullValueFields.add(fieldName);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Caught exception while closing 
pinotSegmentColumnReader for fieldName: %s", fieldName), e);
+      }
+      return value;
+    });
+  }
+
+  public boolean isNullValue(String fieldName) {
+    return _nullValueFields.contains(fieldName) || getValue(fieldName) == null;
+  }
+
+  public void clear() {
+    _fieldToValueMap.clear();
+    _nullValueFields.clear();
+  }
+
+  public Set<String> getColumnNames() {
+    if (_segment == null) {
+      throw new IllegalStateException("Index segment for Lazy row is 
uninitialized.");
+    }
+    return _segment.getColumnNames();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index fc448527ff..47f2fa2a96 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
@@ -51,6 +52,9 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
 @ThreadSafe
 public class ConcurrentMapPartitionUpsertMetadataManager extends 
BasePartitionUpsertMetadataManager {
 
+  // Used to initialize a reference to previous row for merging in partial 
upsert
+  private final LazyRow _reusePreviousRow = new LazyRow();
+
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
@@ -302,7 +306,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
             ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
currentSegment.getQueryableDocIds();
             int currentDocId = recordLocation.getDocId();
             if (currentQueryableDocIds == null || 
currentQueryableDocIds.contains(currentDocId)) {
-              _partialUpsertHandler.merge(currentSegment, currentDocId, 
record);
+              _reusePreviousRow.init(currentSegment, currentDocId);
+              _partialUpsertHandler.merge(_reusePreviousRow, record);
             }
           }
           return recordLocation;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 0320fdf04e..8fef9c3602 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -18,15 +18,13 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
 import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
-import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -64,12 +62,12 @@ public class PartialUpsertHandler {
    * For example, overwrite merger will only override the prev value if the 
new value is not null.
    * Null values will override existing values if not configured. They can be 
ignored by using ignoreMerger.
    *
-   * @param indexSegment the segment of the last derived full record during 
ingestion.
-   * @param docId the docId of the last derived full record during ingestion 
in the segment.
+   * @param prevRecord wrapper for previous record, which lazily reads column 
values of previous row and caches for
+   *                   re-reads.
    * @param newRecord the new consumed record.
    */
-  public void merge(IndexSegment indexSegment, int docId, GenericRow 
newRecord) {
-    for (String column : indexSegment.getColumnNames()) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+    for (String column : prevRecord.getColumnNames()) {
       if (!_primaryKeyColumns.contains(column)) {
         PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, 
_defaultPartialUpsertMerger);
         // Non-overwrite mergers
@@ -77,40 +75,29 @@ public class PartialUpsertHandler {
         // (2) Else If the value of new value is null, use the previous value 
(even for comparison columns).
         // (3) Else If the column is not a comparison column, we applied the 
merged value to it.
         if (!(merger instanceof OverwriteMerger)) {
-          try (PinotSegmentColumnReader pinotSegmentColumnReader = new 
PinotSegmentColumnReader(indexSegment, column)) {
-            if (!pinotSegmentColumnReader.isNull(docId)) {
-              Object previousValue = pinotSegmentColumnReader.getValue(docId);
-              if (newRecord.isNullValue(column)) {
-                // Note that we intentionally want to overwrite any previous 
_comparisonColumn value in the case of
-                // using
-                // multiple comparison columns. We never apply a merge 
function to it, rather we just take any/all
-                // non-null comparison column values from the previous record, 
and the sole non-null comparison column
-                // value from the new record.
-                newRecord.putValue(column, previousValue);
-                newRecord.removeNullValueField(column);
-              } else if (!_comparisonColumns.contains(column)) {
-                newRecord.putValue(column, merger.merge(previousValue, 
newRecord.getValue(column)));
-              }
+          Object prevValue = prevRecord.getValue(column);
+          if (prevValue != null) {
+            if (newRecord.isNullValue(column)) {
+              // Note that we intentionally want to overwrite any previous 
_comparisonColumn value in the case of
+              // using
+              // multiple comparison columns. We never apply a merge function 
to it, rather we just take any/all
+              // non-null comparison column values from the previous record, 
and the sole non-null comparison column
+              // value from the new record.
+              newRecord.putValue(column, prevValue);
+              newRecord.removeNullValueField(column);
+            } else if (!_comparisonColumns.contains(column)) {
+              newRecord.putValue(column, merger.merge(prevValue, 
newRecord.getValue(column)));
             }
-          } catch (IOException e) {
-            throw new RuntimeException(
-                String.format("Caught exception while closing 
pinotSegmentColumnReader for column: %s", column), e);
           }
         } else {
           // Overwrite mergers.
           // (1) If the merge strategy is Overwrite merger and newValue is not 
null, skip and use the new value
           // (2) Otherwise, if previous is not null, init columnReader and use 
the previous value.
           if (newRecord.isNullValue(column)) {
-            try (PinotSegmentColumnReader pinotSegmentColumnReader = new 
PinotSegmentColumnReader(indexSegment,
-                column)) {
-              if (!pinotSegmentColumnReader.isNull(docId)) {
-                Object previousValue = 
pinotSegmentColumnReader.getValue(docId);
-                newRecord.putValue(column, previousValue);
-                newRecord.removeNullValueField(column);
-              }
-            } catch (IOException e) {
-              throw new RuntimeException(
-                  String.format("Caught exception while closing 
pinotSegmentColumnReader for column: %s", column), e);
+            Object prevValue = prevRecord.getValue(column);
+            if (prevValue != null) {
+              newRecord.putValue(column, prevValue);
+              newRecord.removeNullValueField(column);
             }
           }
         }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/LazyRowTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/LazyRowTest.java
new file mode 100644
index 0000000000..54ac86c25c
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/LazyRowTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class LazyRowTest {
+
+  private DataSource _col1Datasource;
+  private Dictionary _col2Dictionary;
+
+  @Test
+  public void testIsNullField() {
+    IndexSegment segment = getMockSegment();
+    LazyRow lazyRow = spy(new LazyRow());
+    lazyRow.init(segment, 1);
+
+    // first invocation will read from disk
+    assertTrue(lazyRow.isNullValue("col1"));
+
+    assertTrue(lazyRow.isNullValue("col1"));
+    // only one disk read.
+    verify(lazyRow, times(1)).getValue("col1");
+
+    // should return false when value exists for a field in an indexed row
+    assertFalse(lazyRow.isNullValue("col2"));
+  }
+
+  @Test
+  public void testGetValue() {
+    IndexSegment segment = getMockSegment();
+    LazyRow lazyRow = spy(new LazyRow());
+    lazyRow.init(segment, 1);
+
+    // should return persisted value
+    assertEquals(lazyRow.getValue("col2"), "val2");
+
+    // second invocation should read from LazyRow._nullValueFields
+    assertEquals(lazyRow.getValue("col2"), "val2");
+    // only one disk read
+    verify(_col2Dictionary, times(1)).get(1);
+
+    assertNull(lazyRow.getValue("col1"));
+  }
+
+  @Test
+  public void testGetColumnNames() {
+    IndexSegment segment = getMockSegment();
+    LazyRow lazyRow = new LazyRow();
+    lazyRow.init(segment, 1);
+    HashSet<String> columnNames = new HashSet<>(Arrays.asList("col1", "col2"));
+    when(segment.getColumnNames()).thenReturn(columnNames);
+
+    assertEquals(lazyRow.getColumnNames(), columnNames);
+  }
+
+  private IndexSegment getMockSegment() {
+    IndexSegment segment = mock(IndexSegment.class);
+    _col1Datasource = mock(DataSource.class);
+    DataSource col2Datasource = mock(DataSource.class);
+    when(segment.getDataSource("col1")).thenReturn(_col1Datasource);
+    when(segment.getDataSource("col2")).thenReturn(col2Datasource);
+
+    NullValueVectorReader col1NullVectorReader = 
mock(NullValueVectorReader.class);
+    when(col1NullVectorReader.isNull(1)).thenReturn(true);
+    NullValueVectorReader col2NullVectorReader = 
mock(NullValueVectorReader.class);
+    when(col2NullVectorReader.isNull(1)).thenReturn(false);
+    
when(_col1Datasource.getNullValueVector()).thenReturn(col1NullVectorReader);
+    when(col2Datasource.getNullValueVector()).thenReturn(col2NullVectorReader);
+
+    ForwardIndexReader col1ForwardIndexReader = mock(ForwardIndexReader.class);
+    when(col1ForwardIndexReader.isSingleValue()).thenReturn(true);
+    ForwardIndexReader col2ForwardIndexReader = mock(ForwardIndexReader.class);
+    when(col2ForwardIndexReader.isSingleValue()).thenReturn(true);
+    when(_col1Datasource.getForwardIndex()).thenReturn(col1ForwardIndexReader);
+    when(col2Datasource.getForwardIndex()).thenReturn(col2ForwardIndexReader);
+    when(col2ForwardIndexReader.getDictId(eq(1), any())).thenReturn(1);
+
+    Dictionary col1Dictionary = mock(Dictionary.class);
+    when(_col1Datasource.getDictionary()).thenReturn(col1Dictionary);
+    _col2Dictionary = mock(Dictionary.class);
+    when(col2Datasource.getDictionary()).thenReturn(_col2Dictionary);
+    when(_col2Dictionary.get(1)).thenReturn("val2");
+
+    return segment;
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index a577325773..f66aa2a679 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -86,6 +87,8 @@ public class PartialUpsertHandlerTest {
 
       ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
       when(segment.getColumnNames()).thenReturn(Sets.newSet("field1", 
"field2", "hoursSinceEpoch"));
+      LazyRow prevRecord = new LazyRow();
+      prevRecord.init(segment, 1);
 
       GenericRow row = new GenericRow();
       if (isNewNull) {
@@ -93,7 +96,7 @@ public class PartialUpsertHandlerTest {
       } else {
         row.putValue(columnName, newValue);
       }
-      handler.merge(segment, 1, row);
+      handler.merge(prevRecord, row);
       assertEquals(row.getValue(columnName), expectedValue);
       assertEquals(row.isNullValue(columnName), isExpectedNull);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to