This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 736829cdda add LazyRow abstraction for previously indexed record
(#11826)
736829cdda is described below
commit 736829cdda01435be80a8b4286a3f8128a1ec18e
Author: rohit <[email protected]>
AuthorDate: Fri Oct 27 02:01:23 2023 +0530
add LazyRow abstraction for previously indexed record (#11826)
---
.../segment/local/segment/readers/LazyRow.java | 102 ++++++++++++++++++
...oncurrentMapPartitionUpsertMetadataManager.java | 7 +-
.../segment/local/upsert/PartialUpsertHandler.java | 55 ++++------
.../segment/local/segment/readers/LazyRowTest.java | 119 +++++++++++++++++++++
.../local/upsert/PartialUpsertHandlerTest.java | 5 +-
5 files changed, 252 insertions(+), 36 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java
new file mode 100644
index 0000000000..4f05ae96f6
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * <p>A wrapper class to read column values of a row for a given {@link
IndexSegment} and docId.<br>
+ * The advantage of having wrapper over segment and docId is column values are
read only when
+ * {@link LazyRow#getValue(String)} is invoked.
+ * This is useful to reduce the disk reads incurred due to loading the
complete previous row during merge step.
+ *
+ * <p>The LazyRow has an internal state and should not be used concurrently.
To reuse the LazyRow, create an instance
+ * using no arg constructor and re-initialise using {@link
LazyRow#init(IndexSegment, int)}
+ */
+public class LazyRow {
+ private final Map<String, Object> _fieldToValueMap = new HashMap<>();
+ private final Set<String> _nullValueFields = new HashSet<>();
+ private IndexSegment _segment;
+ private int _docId;
+
+ public LazyRow() {
+ }
+
+ public void init(IndexSegment segment, int docId) {
+ clear();
+ _segment = segment;
+ _docId = docId;
+ }
+
+ /**
+ * Computes a field's value in an indexed row.
+ * @param fieldName
+ * @return Returns value or null for persisted null values
+ */
+ @Nullable
+ public Object getValue(String fieldName) {
+
+ // if field's value was previously read as null, return null
+ if (_nullValueFields.contains(fieldName)) {
+ return null;
+ }
+ if (_segment == null) {
+ throw new IllegalStateException("Index segment for Lazy row is
uninitialized.");
+ }
+
+ // compute the _fieldToValueMap or _nullValueFields based on the indexed
value
+ return _fieldToValueMap.computeIfAbsent(fieldName, col -> {
+ Object value = null;
+ try (PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(_segment, col)) {
+ if (!columnReader.isNull(_docId)) {
+ value = columnReader.getValue(_docId);
+ } else {
+ _nullValueFields.add(fieldName);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Caught exception while closing
pinotSegmentColumnReader for fieldName: %s", fieldName), e);
+ }
+ return value;
+ });
+ }
+
+ public boolean isNullValue(String fieldName) {
+ return _nullValueFields.contains(fieldName) || getValue(fieldName) == null;
+ }
+
+ public void clear() {
+ _fieldToValueMap.clear();
+ _nullValueFields.clear();
+ }
+
+ public Set<String> getColumnNames() {
+ if (_segment == null) {
+ throw new IllegalStateException("Index segment for Lazy row is
uninitialized.");
+ }
+ return _segment.getColumnNames();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index fc448527ff..47f2fa2a96 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
@@ -51,6 +52,9 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
@ThreadSafe
public class ConcurrentMapPartitionUpsertMetadataManager extends
BasePartitionUpsertMetadataManager {
+ // Used to initialize a reference to previous row for merging in partial
upsert
+ private final LazyRow _reusePreviousRow = new LazyRow();
+
@VisibleForTesting
final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
@@ -302,7 +306,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
currentSegment.getQueryableDocIds();
int currentDocId = recordLocation.getDocId();
if (currentQueryableDocIds == null ||
currentQueryableDocIds.contains(currentDocId)) {
- _partialUpsertHandler.merge(currentSegment, currentDocId,
record);
+ _reusePreviousRow.init(currentSegment, currentDocId);
+ _partialUpsertHandler.merge(_reusePreviousRow, record);
}
}
return recordLocation;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 0320fdf04e..8fef9c3602 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -18,15 +18,13 @@
*/
package org.apache.pinot.segment.local.upsert;
-import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
-import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -64,12 +62,12 @@ public class PartialUpsertHandler {
* For example, overwrite merger will only override the prev value if the
new value is not null.
* Null values will override existing values if not configured. They can be
ignored by using ignoreMerger.
*
- * @param indexSegment the segment of the last derived full record during
ingestion.
- * @param docId the docId of the last derived full record during ingestion
in the segment.
+ * @param prevRecord wrapper for previous record, which lazily reads column
values of previous row and caches for
+ * re-reads.
* @param newRecord the new consumed record.
*/
- public void merge(IndexSegment indexSegment, int docId, GenericRow
newRecord) {
- for (String column : indexSegment.getColumnNames()) {
+ public void merge(LazyRow prevRecord, GenericRow newRecord) {
+ for (String column : prevRecord.getColumnNames()) {
if (!_primaryKeyColumns.contains(column)) {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column,
_defaultPartialUpsertMerger);
// Non-overwrite mergers
@@ -77,40 +75,29 @@ public class PartialUpsertHandler {
// (2) Else If the value of new value is null, use the previous value
(even for comparison columns).
// (3) Else If the column is not a comparison column, we applied the
merged value to it.
if (!(merger instanceof OverwriteMerger)) {
- try (PinotSegmentColumnReader pinotSegmentColumnReader = new
PinotSegmentColumnReader(indexSegment, column)) {
- if (!pinotSegmentColumnReader.isNull(docId)) {
- Object previousValue = pinotSegmentColumnReader.getValue(docId);
- if (newRecord.isNullValue(column)) {
- // Note that we intentionally want to overwrite any previous
_comparisonColumn value in the case of
- // using
- // multiple comparison columns. We never apply a merge
function to it, rather we just take any/all
- // non-null comparison column values from the previous record,
and the sole non-null comparison column
- // value from the new record.
- newRecord.putValue(column, previousValue);
- newRecord.removeNullValueField(column);
- } else if (!_comparisonColumns.contains(column)) {
- newRecord.putValue(column, merger.merge(previousValue,
newRecord.getValue(column)));
- }
+ Object prevValue = prevRecord.getValue(column);
+ if (prevValue != null) {
+ if (newRecord.isNullValue(column)) {
+ // Note that we intentionally want to overwrite any previous
_comparisonColumn value in the case of
+ // using
+ // multiple comparison columns. We never apply a merge function
to it, rather we just take any/all
+ // non-null comparison column values from the previous record,
and the sole non-null comparison column
+ // value from the new record.
+ newRecord.putValue(column, prevValue);
+ newRecord.removeNullValueField(column);
+ } else if (!_comparisonColumns.contains(column)) {
+ newRecord.putValue(column, merger.merge(prevValue,
newRecord.getValue(column)));
}
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Caught exception while closing
pinotSegmentColumnReader for column: %s", column), e);
}
} else {
// Overwrite mergers.
// (1) If the merge strategy is Overwrite merger and newValue is not
null, skip and use the new value
// (2) Otherwise, if previous is not null, init columnReader and use
the previous value.
if (newRecord.isNullValue(column)) {
- try (PinotSegmentColumnReader pinotSegmentColumnReader = new
PinotSegmentColumnReader(indexSegment,
- column)) {
- if (!pinotSegmentColumnReader.isNull(docId)) {
- Object previousValue =
pinotSegmentColumnReader.getValue(docId);
- newRecord.putValue(column, previousValue);
- newRecord.removeNullValueField(column);
- }
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Caught exception while closing
pinotSegmentColumnReader for column: %s", column), e);
+ Object prevValue = prevRecord.getValue(column);
+ if (prevValue != null) {
+ newRecord.putValue(column, prevValue);
+ newRecord.removeNullValueField(column);
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/LazyRowTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/LazyRowTest.java
new file mode 100644
index 0000000000..54ac86c25c
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/LazyRowTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class LazyRowTest {
+
+ private DataSource _col1Datasource;
+ private Dictionary _col2Dictionary;
+
+ @Test
+ public void testIsNullField() {
+ IndexSegment segment = getMockSegment();
+ LazyRow lazyRow = spy(new LazyRow());
+ lazyRow.init(segment, 1);
+
+ // first invocation will read from disk
+ assertTrue(lazyRow.isNullValue("col1"));
+
+ assertTrue(lazyRow.isNullValue("col1"));
+ // only one disk read.
+ verify(lazyRow, times(1)).getValue("col1");
+
+ // should return false when value exists for a field in an indexed row
+ assertFalse(lazyRow.isNullValue("col2"));
+ }
+
+ @Test
+ public void testGetValue() {
+ IndexSegment segment = getMockSegment();
+ LazyRow lazyRow = spy(new LazyRow());
+ lazyRow.init(segment, 1);
+
+ // should return persisted value
+ assertEquals(lazyRow.getValue("col2"), "val2");
+
+ // second invocation should read from LazyRow._nullValueFields
+ assertEquals(lazyRow.getValue("col2"), "val2");
+ // only one disk read
+ verify(_col2Dictionary, times(1)).get(1);
+
+ assertNull(lazyRow.getValue("col1"));
+ }
+
+ @Test
+ public void testGetColumnNames() {
+ IndexSegment segment = getMockSegment();
+ LazyRow lazyRow = new LazyRow();
+ lazyRow.init(segment, 1);
+ HashSet<String> columnNames = new HashSet<>(Arrays.asList("col1", "col2"));
+ when(segment.getColumnNames()).thenReturn(columnNames);
+
+ assertEquals(lazyRow.getColumnNames(), columnNames);
+ }
+
+ private IndexSegment getMockSegment() {
+ IndexSegment segment = mock(IndexSegment.class);
+ _col1Datasource = mock(DataSource.class);
+ DataSource col2Datasource = mock(DataSource.class);
+ when(segment.getDataSource("col1")).thenReturn(_col1Datasource);
+ when(segment.getDataSource("col2")).thenReturn(col2Datasource);
+
+ NullValueVectorReader col1NullVectorReader =
mock(NullValueVectorReader.class);
+ when(col1NullVectorReader.isNull(1)).thenReturn(true);
+ NullValueVectorReader col2NullVectorReader =
mock(NullValueVectorReader.class);
+ when(col2NullVectorReader.isNull(1)).thenReturn(false);
+
when(_col1Datasource.getNullValueVector()).thenReturn(col1NullVectorReader);
+ when(col2Datasource.getNullValueVector()).thenReturn(col2NullVectorReader);
+
+ ForwardIndexReader col1ForwardIndexReader = mock(ForwardIndexReader.class);
+ when(col1ForwardIndexReader.isSingleValue()).thenReturn(true);
+ ForwardIndexReader col2ForwardIndexReader = mock(ForwardIndexReader.class);
+ when(col2ForwardIndexReader.isSingleValue()).thenReturn(true);
+ when(_col1Datasource.getForwardIndex()).thenReturn(col1ForwardIndexReader);
+ when(col2Datasource.getForwardIndex()).thenReturn(col2ForwardIndexReader);
+ when(col2ForwardIndexReader.getDictId(eq(1), any())).thenReturn(1);
+
+ Dictionary col1Dictionary = mock(Dictionary.class);
+ when(_col1Datasource.getDictionary()).thenReturn(col1Dictionary);
+ _col2Dictionary = mock(Dictionary.class);
+ when(col2Datasource.getDictionary()).thenReturn(_col2Dictionary);
+ when(_col2Dictionary.get(1)).thenReturn("val2");
+
+ return segment;
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index a577325773..f66aa2a679 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -86,6 +87,8 @@ public class PartialUpsertHandlerTest {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getColumnNames()).thenReturn(Sets.newSet("field1",
"field2", "hoursSinceEpoch"));
+ LazyRow prevRecord = new LazyRow();
+ prevRecord.init(segment, 1);
GenericRow row = new GenericRow();
if (isNewNull) {
@@ -93,7 +96,7 @@ public class PartialUpsertHandlerTest {
} else {
row.putValue(columnName, newValue);
}
- handler.merge(segment, 1, row);
+ handler.merge(prevRecord, row);
assertEquals(row.getValue(columnName), expectedValue);
assertEquals(row.isNullValue(columnName), isExpectedNull);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]