This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8f0e49e handle indexing failures without corrupting inverted indexes
(#8211)
8f0e49e is described below
commit 8f0e49ed804f63bd583ac207a41808e3b059b82f
Author: Richard Startin <[email protected]>
AuthorDate: Fri Feb 18 10:22:00 2022 +0000
handle indexing failures without corrupting inverted indexes (#8211)
---
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../indexsegment/mutable/MutableSegmentImpl.java | 94 +++++++++++----
.../local/realtime/impl/json/MutableJsonIndex.java | 13 +-
.../indexsegment/mutable/IndexingFailureTest.java | 131 +++++++++++++++++++++
.../mutable/MutableSegmentImplTestUtils.java | 22 +++-
5 files changed, 229 insertions(+), 32 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index b757728..4a9b290 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -65,6 +65,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
SEGMENT_DOWNLOAD_FAILURES("segments", false),
NUM_RESIZES("numResizes", false),
NO_TABLE_ACCESS("tables", true),
+ INDEXING_FAILURES("attributeValues", true),
// Netty connection metrics
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index f66e76b..40fad06 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -75,6 +75,7 @@ import
org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -466,20 +467,19 @@ public class MutableSegmentImpl implements MutableSegment
{
_logger.info("Newly added columns: " +
_newlyAddedColumnsFieldMap.toString());
}
- // NOTE: Okay for single-writer
- @SuppressWarnings("NonAtomicOperationOnVolatileField")
@Override
public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
throws IOException {
boolean canTakeMore;
+ int numDocsIndexed = _numDocsIndexed;
if (isUpsertEnabled()) {
- PartitionUpsertMetadataManager.RecordInfo recordInfo =
getRecordInfo(row, _numDocsIndexed);
+ PartitionUpsertMetadataManager.RecordInfo recordInfo =
getRecordInfo(row, numDocsIndexed);
GenericRow updatedRow =
_partitionUpsertMetadataManager.updateRecord(row, recordInfo);
updateDictionary(updatedRow);
- addNewRow(updatedRow);
+ addNewRow(numDocsIndexed, updatedRow);
// Update number of documents indexed before handling the upsert
metadata so that the record becomes queryable
// once validated
- canTakeMore = _numDocsIndexed++ < _capacity;
+ canTakeMore = numDocsIndexed++ < _capacity;
_partitionUpsertMetadataManager.addRecord(this, recordInfo);
} else {
// Update dictionary first
@@ -489,17 +489,18 @@ public class MutableSegmentImpl implements MutableSegment
{
// docId, else this will return a new docId.
int docId = getOrCreateDocId();
- if (docId == _numDocsIndexed) {
+ if (docId == numDocsIndexed) {
// New row
- addNewRow(row);
+ addNewRow(numDocsIndexed, row);
// Update number of documents indexed at last to make the latest row
queryable
- canTakeMore = _numDocsIndexed++ < _capacity;
+ canTakeMore = numDocsIndexed++ < _capacity;
} else {
assert _aggregateMetrics;
aggregateMetrics(row, docId);
canTakeMore = true;
}
}
+ _numDocsIndexed = numDocsIndexed;
// Update last indexed time and latest ingestion time
_lastIndexedTimeMs = System.currentTimeMillis();
@@ -528,7 +529,9 @@ public class MutableSegmentImpl implements MutableSegment {
IndexContainer indexContainer = entry.getValue();
Object value = row.getValue(column);
MutableDictionary dictionary = indexContainer._dictionary;
- if (dictionary != null) {
+ if (value == null) {
+ recordIndexingError("DICTIONARY");
+ } else if (dictionary != null) {
if (indexContainer._fieldSpec.isSingleValueField()) {
indexContainer._dictId = dictionary.index(value);
} else {
@@ -542,13 +545,23 @@ public class MutableSegmentImpl implements MutableSegment
{
}
}
- private void addNewRow(GenericRow row)
- throws IOException {
- int docId = _numDocsIndexed;
+ private void addNewRow(int docId, GenericRow row) {
for (Map.Entry<String, IndexContainer> entry :
_indexContainerMap.entrySet()) {
String column = entry.getKey();
IndexContainer indexContainer = entry.getValue();
+
+ // Update the null value vector even if a null value is somehow produced
+ if (_nullHandlingEnabled && row.isNullValue(column)) {
+ indexContainer._nullValueVector.setNull(docId);
+ }
+
Object value = row.getValue(column);
+ if (value == null) {
+ // the value should not be null unless something is broken upstream
but this will lead to inappropriate reuse
+ // of the dictionary id if this somehow happens. An NPE here can
corrupt indexes leading to incorrect query
+ // results, hence the extra care. A metric will already have been
emitted when trying to update the dictionary.
+ continue;
+ }
FieldSpec fieldSpec = indexContainer._fieldSpec;
if (fieldSpec.isSingleValueField()) {
// Single-value column
@@ -579,7 +592,11 @@ public class MutableSegmentImpl implements MutableSegment {
// Update inverted index
RealtimeInvertedIndexReader invertedIndex =
indexContainer._invertedIndex;
if (invertedIndex != null) {
- invertedIndex.add(dictId, docId);
+ try {
+ invertedIndex.add(dictId, docId);
+ } catch (Exception e) {
+ recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+ }
}
} else {
// Single-value column with raw index
@@ -636,19 +653,31 @@ public class MutableSegmentImpl implements MutableSegment
{
// Update text index
RealtimeLuceneTextIndexReader textIndex = indexContainer._textIndex;
if (textIndex != null) {
- textIndex.add((String) value);
+ try {
+ textIndex.add((String) value);
+ } catch (Exception e) {
+ recordIndexingError(FieldConfig.IndexType.TEXT, e);
+ }
}
// Update json index
MutableJsonIndex jsonIndex = indexContainer._jsonIndex;
if (jsonIndex != null) {
- jsonIndex.add((String) value);
+ try {
+ jsonIndex.add((String) value);
+ } catch (Exception e) {
+ recordIndexingError(FieldConfig.IndexType.JSON, e);
+ }
}
// Update H3 index
MutableH3Index h3Index = indexContainer._h3Index;
if (h3Index != null) {
- h3Index.add(GeometrySerializer.deserialize((byte[]) value));
+ try {
+ h3Index.add(GeometrySerializer.deserialize((byte[]) value));
+ } catch (Exception e) {
+ recordIndexingError(FieldConfig.IndexType.H3, e);
+ }
}
} else {
// Multi-value column (always dictionary-encoded)
@@ -665,15 +694,30 @@ public class MutableSegmentImpl implements MutableSegment
{
RealtimeInvertedIndexReader invertedIndex =
indexContainer._invertedIndex;
if (invertedIndex != null) {
for (int dictId : dictIds) {
- invertedIndex.add(dictId, docId);
+ try {
+ invertedIndex.add(dictId, docId);
+ } catch (Exception e) {
+ recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+ }
}
}
}
+ }
+ }
- // Update null value vector
- if (_nullHandlingEnabled && row.isNullValue(column)) {
- indexContainer._nullValueVector.setNull(docId);
- }
+ private void recordIndexingError(FieldConfig.IndexType indexType, Exception
exception) {
+ _logger.error("failed to index value with {}", indexType, exception);
+ if (_serverMetrics != null) {
+ String metricKeyName = _realtimeTableName + "-" + indexType +
"-indexingError";
+ _serverMetrics.addMeteredTableValue(metricKeyName,
ServerMeter.INDEXING_FAILURES, 1);
+ }
+ }
+
+ private void recordIndexingError(String indexType) {
+ _logger.error("failed to index value with {}", indexType);
+ if (_serverMetrics != null) {
+ String metricKeyName = _realtimeTableName + "-" + indexType +
"-indexingError";
+ _serverMetrics.addMeteredTableValue(metricKeyName,
ServerMeter.INDEXING_FAILURES, 1);
}
}
@@ -900,7 +944,7 @@ public class MutableSegmentImpl implements MutableSegment {
public int[] getSortedDocIdIterationOrderWithSortedColumn(String column) {
IndexContainer indexContainer = _indexContainerMap.get(column);
MutableDictionary dictionary = indexContainer._dictionary;
-
+ int numDocsIndexed = _numDocsIndexed;
// Sort all values in the dictionary
int numValues = dictionary.length();
int[] dictIds = new int[numValues];
@@ -911,7 +955,7 @@ public class MutableSegmentImpl implements MutableSegment {
// Re-order documents using the inverted index
RealtimeInvertedIndexReader invertedIndex = indexContainer._invertedIndex;
- int[] docIds = new int[_numDocsIndexed];
+ int[] docIds = new int[numDocsIndexed];
int[] batch = new int[256];
int docIdIndex = 0;
for (int dictId : dictIds) {
@@ -925,8 +969,8 @@ public class MutableSegmentImpl implements MutableSegment {
}
// Sanity check
- Preconditions.checkState(_numDocsIndexed == docIdIndex,
- "The number of documents indexed: %s is not equal to the number of
sorted documents: %s", _numDocsIndexed,
+ Preconditions.checkState(numDocsIndexed == docIdIndex,
+ "The number of documents indexed: %s is not equal to the number of
sorted documents: %s", numDocsIndexed,
docIdIndex);
return docIds;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
index d5cf700..136fcd7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
@@ -73,12 +73,16 @@ public class MutableJsonIndex implements JsonIndexReader {
*/
public void add(String jsonString)
throws IOException {
- List<Map<String, String>> flattenedRecords =
JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString));
- _writeLock.lock();
try {
- addFlattenedRecords(flattenedRecords);
+ List<Map<String, String>> flattenedRecords =
JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString));
+ _writeLock.lock();
+ try {
+ addFlattenedRecords(flattenedRecords);
+ } finally {
+ _writeLock.unlock();
+ }
} finally {
- _writeLock.unlock();
+ _nextDocId++;
}
}
@@ -104,7 +108,6 @@ public class MutableJsonIndex implements JsonIndexReader {
}
_nextFlattenedDocId++;
}
- _nextDocId++;
}
@Override
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
new file mode 100644
index 0000000..07bb1ce
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexingFailureTest {
+ private static final String TABLE_NAME = "testTable";
+ private static final String INT_COL = "int_col";
+ private static final String STRING_COL = "string_col";
+ private static final String JSON_COL = "json_col";
+
+ private MutableSegmentImpl _mutableSegment;
+ private ServerMetrics _serverMetrics;
+
+ @BeforeMethod
+ public void setup() {
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+ .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(JSON_COL, FieldSpec.DataType.JSON)
+ .setSchemaName(TABLE_NAME)
+ .build();
+ _serverMetrics = mock(ServerMetrics.class);
+ _mutableSegment =
MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
+ Collections.emptySet(), Collections.emptySet(), new
HashSet<>(Arrays.asList(INT_COL, STRING_COL)),
+ Collections.singleton(JSON_COL), _serverMetrics);
+ }
+
+ @Test
+ public void testIndexingFailures()
+ throws IOException {
+ StreamMessageMetadata defaultMetadata = new
StreamMessageMetadata(System.currentTimeMillis());
+ GenericRow goodRow = new GenericRow();
+ goodRow.putValue(INT_COL, 0);
+ goodRow.putValue(STRING_COL, "a");
+ goodRow.putValue(JSON_COL, "{\"valid\": \"json\"}");
+ _mutableSegment.index(goodRow, defaultMetadata);
+ assertEquals(_mutableSegment.getNumDocsIndexed(), 1);
+
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(0),
+ ImmutableRoaringBitmap.bitmapOf(0));
+
assertEquals(_mutableSegment.getDataSource(STRING_COL).getInvertedIndex().getDocIds(0),
+ ImmutableRoaringBitmap.bitmapOf(0));
+
assertEquals(_mutableSegment.getDataSource(JSON_COL).getJsonIndex().getMatchingDocIds("valid
= 'json'"),
+ ImmutableRoaringBitmap.bitmapOf(0));
+ verify(_serverMetrics,
never()).addMeteredTableValue(matches("indexingError$"),
eq(ServerMeter.INDEXING_FAILURES),
+ anyLong());
+ reset(_serverMetrics);
+
+ GenericRow badRow = new GenericRow();
+ badRow.putValue(INT_COL, 0);
+ badRow.putValue(STRING_COL, "b");
+ badRow.putValue(JSON_COL, "{\"truncatedJson...");
+ _mutableSegment.index(badRow, defaultMetadata);
+ assertEquals(_mutableSegment.getNumDocsIndexed(), 2);
+
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(0),
+ ImmutableRoaringBitmap.bitmapOf(0, 1));
+
assertEquals(_mutableSegment.getDataSource(STRING_COL).getInvertedIndex().getDocIds(1),
+ ImmutableRoaringBitmap.bitmapOf(1));
+ verify(_serverMetrics,
times(1)).addMeteredTableValue(matches("-JSON-indexingError"),
+ eq(ServerMeter.INDEXING_FAILURES), eq(1L));
+ reset(_serverMetrics);
+
+ GenericRow anotherGoodRow = new GenericRow();
+ anotherGoodRow.putValue(INT_COL, 2);
+ anotherGoodRow.putValue(STRING_COL, "c");
+ anotherGoodRow.putValue(JSON_COL, "{\"valid\": \"json\"}");
+ _mutableSegment.index(anotherGoodRow, defaultMetadata);
+ assertEquals(_mutableSegment.getNumDocsIndexed(), 3);
+
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(1),
+ ImmutableRoaringBitmap.bitmapOf(2));
+
assertEquals(_mutableSegment.getDataSource(STRING_COL).getInvertedIndex().getDocIds(2),
+ ImmutableRoaringBitmap.bitmapOf(2));
+
assertEquals(_mutableSegment.getDataSource(JSON_COL).getJsonIndex().getMatchingDocIds("valid
= 'json'"),
+ ImmutableRoaringBitmap.bitmapOf(0, 2));
+ verify(_serverMetrics,
never()).addMeteredTableValue(matches("indexingError$"),
eq(ServerMeter.INDEXING_FAILURES),
+ anyLong());
+ reset(_serverMetrics);
+
+ GenericRow nullStringRow = new GenericRow();
+ nullStringRow.putValue(INT_COL, 0);
+ nullStringRow.putValue(STRING_COL, null);
+ nullStringRow.addNullValueField(STRING_COL);
+ nullStringRow.putValue(JSON_COL, "{\"valid\": \"json\"}");
+ _mutableSegment.index(nullStringRow, defaultMetadata);
+ assertEquals(_mutableSegment.getNumDocsIndexed(), 4);
+
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(0),
+ ImmutableRoaringBitmap.bitmapOf(0, 1, 3));
+
assertEquals(_mutableSegment.getDataSource(JSON_COL).getJsonIndex().getMatchingDocIds("valid
= 'json'"),
+ ImmutableRoaringBitmap.bitmapOf(0, 2, 3));
+
assertTrue(_mutableSegment.getDataSource(STRING_COL).getNullValueVector().isNull(3));
+ // null string value skipped
+ verify(_serverMetrics,
times(1)).addMeteredTableValue(matches("DICTIONARY-indexingError$"),
+ eq(ServerMeter.INDEXING_FAILURES),
+ eq(1L));
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 6c2e372..e715cdd 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.segment.local.indexsegment.mutable;
+import java.util.Collections;
import java.util.Set;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
@@ -57,6 +59,22 @@ public class MutableSegmentImplTestUtils {
Set<String> varLengthDictionaryColumns, Set<String>
invertedIndexColumns, boolean aggregateMetrics,
boolean nullHandlingEnabled, UpsertConfig upsertConfig, String
timeColumnName,
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+ return createMutableSegmentImpl(schema, noDictionaryColumns,
varLengthDictionaryColumns, invertedIndexColumns,
+ Collections.emptySet(), aggregateMetrics, nullHandlingEnabled,
upsertConfig, timeColumnName,
+ partitionUpsertMetadataManager, null);
+ }
+
+ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
+ Set<String> varLengthDictionaryColumns, Set<String>
invertedIndexColumns, Set<String> jsonIndexColumns,
+ ServerMetrics serverMetrics) {
+ return createMutableSegmentImpl(schema, noDictionaryColumns,
varLengthDictionaryColumns, invertedIndexColumns,
+ jsonIndexColumns, false, true, null, null, null, serverMetrics);
+ }
+
+ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
+ Set<String> varLengthDictionaryColumns, Set<String>
invertedIndexColumns, Set<String> jsonIndexColumns,
+ boolean aggregateMetrics, boolean nullHandlingEnabled, UpsertConfig
upsertConfig, String timeColumnName,
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager,
ServerMetrics serverMetrics) {
RealtimeSegmentStatsHistory statsHistory =
mock(RealtimeSegmentStatsHistory.class);
when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -68,13 +86,13 @@ public class MutableSegmentImplTestUtils {
RealtimeSegmentConfig realtimeSegmentConfig =
new
RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
.setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
-
.setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns)
+
.setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns).setJsonIndexColumns(jsonIndexColumns)
.setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
.setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
.setMemoryManager(new
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
.setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
- return new MutableSegmentImpl(realtimeSegmentConfig, null);
+ return new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]