This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f0e49e  handle indexing failures without corrupting inverted indexes 
(#8211)
8f0e49e is described below

commit 8f0e49ed804f63bd583ac207a41808e3b059b82f
Author: Richard Startin <[email protected]>
AuthorDate: Fri Feb 18 10:22:00 2022 +0000

    handle indexing failures without corrupting inverted indexes (#8211)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |  94 +++++++++++----
 .../local/realtime/impl/json/MutableJsonIndex.java |  13 +-
 .../indexsegment/mutable/IndexingFailureTest.java  | 131 +++++++++++++++++++++
 .../mutable/MutableSegmentImplTestUtils.java       |  22 +++-
 5 files changed, 229 insertions(+), 32 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index b757728..4a9b290 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -65,6 +65,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   SEGMENT_DOWNLOAD_FAILURES("segments", false),
   NUM_RESIZES("numResizes", false),
   NO_TABLE_ACCESS("tables", true),
+  INDEXING_FAILURES("attributeValues", true),
 
   // Netty connection metrics
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index f66e76b..40fad06 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -75,6 +75,7 @@ import 
org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -466,20 +467,19 @@ public class MutableSegmentImpl implements MutableSegment 
{
     _logger.info("Newly added columns: " + 
_newlyAddedColumnsFieldMap.toString());
   }
 
-  // NOTE: Okay for single-writer
-  @SuppressWarnings("NonAtomicOperationOnVolatileField")
   @Override
   public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
       throws IOException {
     boolean canTakeMore;
+    int numDocsIndexed = _numDocsIndexed;
     if (isUpsertEnabled()) {
-      PartitionUpsertMetadataManager.RecordInfo recordInfo = 
getRecordInfo(row, _numDocsIndexed);
+      PartitionUpsertMetadataManager.RecordInfo recordInfo = 
getRecordInfo(row, numDocsIndexed);
       GenericRow updatedRow = 
_partitionUpsertMetadataManager.updateRecord(row, recordInfo);
       updateDictionary(updatedRow);
-      addNewRow(updatedRow);
+      addNewRow(numDocsIndexed, updatedRow);
       // Update number of documents indexed before handling the upsert 
metadata so that the record becomes queryable
       // once validated
-      canTakeMore = _numDocsIndexed++ < _capacity;
+      canTakeMore = numDocsIndexed++ < _capacity;
       _partitionUpsertMetadataManager.addRecord(this, recordInfo);
     } else {
       // Update dictionary first
@@ -489,17 +489,18 @@ public class MutableSegmentImpl implements MutableSegment 
{
       // docId, else this will return a new docId.
       int docId = getOrCreateDocId();
 
-      if (docId == _numDocsIndexed) {
+      if (docId == numDocsIndexed) {
         // New row
-        addNewRow(row);
+        addNewRow(numDocsIndexed, row);
         // Update number of documents indexed at last to make the latest row 
queryable
-        canTakeMore = _numDocsIndexed++ < _capacity;
+        canTakeMore = numDocsIndexed++ < _capacity;
       } else {
         assert _aggregateMetrics;
         aggregateMetrics(row, docId);
         canTakeMore = true;
       }
     }
+    _numDocsIndexed = numDocsIndexed;
 
     // Update last indexed time and latest ingestion time
     _lastIndexedTimeMs = System.currentTimeMillis();
@@ -528,7 +529,9 @@ public class MutableSegmentImpl implements MutableSegment {
       IndexContainer indexContainer = entry.getValue();
       Object value = row.getValue(column);
       MutableDictionary dictionary = indexContainer._dictionary;
-      if (dictionary != null) {
+      if (value == null) {
+        recordIndexingError("DICTIONARY");
+      } else if (dictionary != null) {
         if (indexContainer._fieldSpec.isSingleValueField()) {
           indexContainer._dictId = dictionary.index(value);
         } else {
@@ -542,13 +545,23 @@ public class MutableSegmentImpl implements MutableSegment 
{
     }
   }
 
-  private void addNewRow(GenericRow row)
-      throws IOException {
-    int docId = _numDocsIndexed;
+  private void addNewRow(int docId, GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : 
_indexContainerMap.entrySet()) {
       String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
+
+      // Update the null value vector even if a null value is somehow produced
+      if (_nullHandlingEnabled && row.isNullValue(column)) {
+        indexContainer._nullValueVector.setNull(docId);
+      }
+
       Object value = row.getValue(column);
+      if (value == null) {
+        // the value should not be null unless something is broken upstream 
but this will lead to inappropriate reuse
+        // of the dictionary id if this somehow happens. An NPE here can 
corrupt indexes leading to incorrect query
+        // results, hence the extra care. A metric will already have been 
emitted when trying to update the dictionary.
+        continue;
+      }
       FieldSpec fieldSpec = indexContainer._fieldSpec;
       if (fieldSpec.isSingleValueField()) {
         // Single-value column
@@ -579,7 +592,11 @@ public class MutableSegmentImpl implements MutableSegment {
           // Update inverted index
           RealtimeInvertedIndexReader invertedIndex = 
indexContainer._invertedIndex;
           if (invertedIndex != null) {
-            invertedIndex.add(dictId, docId);
+            try {
+              invertedIndex.add(dictId, docId);
+            } catch (Exception e) {
+              recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+            }
           }
         } else {
           // Single-value column with raw index
@@ -636,19 +653,31 @@ public class MutableSegmentImpl implements MutableSegment 
{
         // Update text index
         RealtimeLuceneTextIndexReader textIndex = indexContainer._textIndex;
         if (textIndex != null) {
-          textIndex.add((String) value);
+          try {
+            textIndex.add((String) value);
+          } catch (Exception e) {
+            recordIndexingError(FieldConfig.IndexType.TEXT, e);
+          }
         }
 
         // Update json index
         MutableJsonIndex jsonIndex = indexContainer._jsonIndex;
         if (jsonIndex != null) {
-          jsonIndex.add((String) value);
+          try {
+            jsonIndex.add((String) value);
+          } catch (Exception e) {
+            recordIndexingError(FieldConfig.IndexType.JSON, e);
+          }
         }
 
         // Update H3 index
         MutableH3Index h3Index = indexContainer._h3Index;
         if (h3Index != null) {
-          h3Index.add(GeometrySerializer.deserialize((byte[]) value));
+          try {
+            h3Index.add(GeometrySerializer.deserialize((byte[]) value));
+          } catch (Exception e) {
+            recordIndexingError(FieldConfig.IndexType.H3, e);
+          }
         }
       } else {
         // Multi-value column (always dictionary-encoded)
@@ -665,15 +694,30 @@ public class MutableSegmentImpl implements MutableSegment 
{
         RealtimeInvertedIndexReader invertedIndex = 
indexContainer._invertedIndex;
         if (invertedIndex != null) {
           for (int dictId : dictIds) {
-            invertedIndex.add(dictId, docId);
+            try {
+              invertedIndex.add(dictId, docId);
+            } catch (Exception e) {
+              recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+            }
           }
         }
       }
+    }
+  }
 
-      // Update null value vector
-      if (_nullHandlingEnabled && row.isNullValue(column)) {
-        indexContainer._nullValueVector.setNull(docId);
-      }
+  private void recordIndexingError(FieldConfig.IndexType indexType, Exception 
exception) {
+    _logger.error("failed to index value with {}", indexType, exception);
+    if (_serverMetrics != null) {
+      String metricKeyName = _realtimeTableName + "-" + indexType + 
"-indexingError";
+      _serverMetrics.addMeteredTableValue(metricKeyName, 
ServerMeter.INDEXING_FAILURES, 1);
+    }
+  }
+
+  private void recordIndexingError(String indexType) {
+    _logger.error("failed to index value with {}", indexType);
+    if (_serverMetrics != null) {
+      String metricKeyName = _realtimeTableName + "-" + indexType + 
"-indexingError";
+      _serverMetrics.addMeteredTableValue(metricKeyName, 
ServerMeter.INDEXING_FAILURES, 1);
     }
   }
 
@@ -900,7 +944,7 @@ public class MutableSegmentImpl implements MutableSegment {
   public int[] getSortedDocIdIterationOrderWithSortedColumn(String column) {
     IndexContainer indexContainer = _indexContainerMap.get(column);
     MutableDictionary dictionary = indexContainer._dictionary;
-
+    int numDocsIndexed = _numDocsIndexed;
     // Sort all values in the dictionary
     int numValues = dictionary.length();
     int[] dictIds = new int[numValues];
@@ -911,7 +955,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
     // Re-order documents using the inverted index
     RealtimeInvertedIndexReader invertedIndex = indexContainer._invertedIndex;
-    int[] docIds = new int[_numDocsIndexed];
+    int[] docIds = new int[numDocsIndexed];
     int[] batch = new int[256];
     int docIdIndex = 0;
     for (int dictId : dictIds) {
@@ -925,8 +969,8 @@ public class MutableSegmentImpl implements MutableSegment {
     }
 
     // Sanity check
-    Preconditions.checkState(_numDocsIndexed == docIdIndex,
-        "The number of documents indexed: %s is not equal to the number of 
sorted documents: %s", _numDocsIndexed,
+    Preconditions.checkState(numDocsIndexed == docIdIndex,
+        "The number of documents indexed: %s is not equal to the number of 
sorted documents: %s", numDocsIndexed,
         docIdIndex);
 
     return docIds;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
index d5cf700..136fcd7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
@@ -73,12 +73,16 @@ public class MutableJsonIndex implements JsonIndexReader {
    */
   public void add(String jsonString)
       throws IOException {
-    List<Map<String, String>> flattenedRecords = 
JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString));
-    _writeLock.lock();
     try {
-      addFlattenedRecords(flattenedRecords);
+      List<Map<String, String>> flattenedRecords = 
JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString));
+      _writeLock.lock();
+      try {
+        addFlattenedRecords(flattenedRecords);
+      } finally {
+        _writeLock.unlock();
+      }
     } finally {
-      _writeLock.unlock();
+      _nextDocId++;
     }
   }
 
@@ -104,7 +108,6 @@ public class MutableJsonIndex implements JsonIndexReader {
       }
       _nextFlattenedDocId++;
     }
-    _nextDocId++;
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
new file mode 100644
index 0000000..07bb1ce
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexingFailureTest {
+  private static final String TABLE_NAME = "testTable";
+  private static final String INT_COL = "int_col";
+  private static final String STRING_COL = "string_col";
+  private static final String JSON_COL = "json_col";
+
+  private MutableSegmentImpl _mutableSegment;
+  private ServerMetrics _serverMetrics;
+
+  @BeforeMethod
+  public void setup() {
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(JSON_COL, FieldSpec.DataType.JSON)
+        .setSchemaName(TABLE_NAME)
+        .build();
+    _serverMetrics = mock(ServerMetrics.class);
+    _mutableSegment = 
MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
+        Collections.emptySet(), Collections.emptySet(), new 
HashSet<>(Arrays.asList(INT_COL, STRING_COL)),
+        Collections.singleton(JSON_COL), _serverMetrics);
+  }
+
+  @Test
+  public void testIndexingFailures()
+      throws IOException {
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
+    GenericRow goodRow = new GenericRow();
+    goodRow.putValue(INT_COL, 0);
+    goodRow.putValue(STRING_COL, "a");
+    goodRow.putValue(JSON_COL, "{\"valid\": \"json\"}");
+    _mutableSegment.index(goodRow, defaultMetadata);
+    assertEquals(_mutableSegment.getNumDocsIndexed(), 1);
+    
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(0),
+        ImmutableRoaringBitmap.bitmapOf(0));
+    
assertEquals(_mutableSegment.getDataSource(STRING_COL).getInvertedIndex().getDocIds(0),
+        ImmutableRoaringBitmap.bitmapOf(0));
+    
assertEquals(_mutableSegment.getDataSource(JSON_COL).getJsonIndex().getMatchingDocIds("valid
 = 'json'"),
+        ImmutableRoaringBitmap.bitmapOf(0));
+    verify(_serverMetrics, 
never()).addMeteredTableValue(matches("indexingError$"), 
eq(ServerMeter.INDEXING_FAILURES),
+        anyLong());
+    reset(_serverMetrics);
+
+    GenericRow badRow = new GenericRow();
+    badRow.putValue(INT_COL, 0);
+    badRow.putValue(STRING_COL, "b");
+    badRow.putValue(JSON_COL, "{\"truncatedJson...");
+    _mutableSegment.index(badRow, defaultMetadata);
+    assertEquals(_mutableSegment.getNumDocsIndexed(), 2);
+    
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(0),
+        ImmutableRoaringBitmap.bitmapOf(0, 1));
+    
assertEquals(_mutableSegment.getDataSource(STRING_COL).getInvertedIndex().getDocIds(1),
+        ImmutableRoaringBitmap.bitmapOf(1));
+    verify(_serverMetrics, 
times(1)).addMeteredTableValue(matches("-JSON-indexingError"),
+        eq(ServerMeter.INDEXING_FAILURES), eq(1L));
+    reset(_serverMetrics);
+
+    GenericRow anotherGoodRow = new GenericRow();
+    anotherGoodRow.putValue(INT_COL, 2);
+    anotherGoodRow.putValue(STRING_COL, "c");
+    anotherGoodRow.putValue(JSON_COL, "{\"valid\": \"json\"}");
+    _mutableSegment.index(anotherGoodRow, defaultMetadata);
+    assertEquals(_mutableSegment.getNumDocsIndexed(), 3);
+    
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(1),
+        ImmutableRoaringBitmap.bitmapOf(2));
+    
assertEquals(_mutableSegment.getDataSource(STRING_COL).getInvertedIndex().getDocIds(2),
+        ImmutableRoaringBitmap.bitmapOf(2));
+    
assertEquals(_mutableSegment.getDataSource(JSON_COL).getJsonIndex().getMatchingDocIds("valid
 = 'json'"),
+        ImmutableRoaringBitmap.bitmapOf(0, 2));
+    verify(_serverMetrics, 
never()).addMeteredTableValue(matches("indexingError$"), 
eq(ServerMeter.INDEXING_FAILURES),
+        anyLong());
+    reset(_serverMetrics);
+
+    GenericRow nullStringRow = new GenericRow();
+    nullStringRow.putValue(INT_COL, 0);
+    nullStringRow.putValue(STRING_COL, null);
+    nullStringRow.addNullValueField(STRING_COL);
+    nullStringRow.putValue(JSON_COL, "{\"valid\": \"json\"}");
+    _mutableSegment.index(nullStringRow, defaultMetadata);
+    assertEquals(_mutableSegment.getNumDocsIndexed(), 4);
+    
assertEquals(_mutableSegment.getDataSource(INT_COL).getInvertedIndex().getDocIds(0),
+        ImmutableRoaringBitmap.bitmapOf(0, 1, 3));
+    
assertEquals(_mutableSegment.getDataSource(JSON_COL).getJsonIndex().getMatchingDocIds("valid
 = 'json'"),
+        ImmutableRoaringBitmap.bitmapOf(0, 2, 3));
+    
assertTrue(_mutableSegment.getDataSource(STRING_COL).getNullValueVector().isNull(3));
+    // null string value skipped
+    verify(_serverMetrics, 
times(1)).addMeteredTableValue(matches("DICTIONARY-indexingError$"),
+        eq(ServerMeter.INDEXING_FAILURES),
+        eq(1L));
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 6c2e372..e715cdd 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
+import java.util.Collections;
 import java.util.Set;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
@@ -57,6 +59,22 @@ public class MutableSegmentImplTestUtils {
       Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, boolean aggregateMetrics,
       boolean nullHandlingEnabled, UpsertConfig upsertConfig, String 
timeColumnName,
       PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+    return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
+        Collections.emptySet(), aggregateMetrics, nullHandlingEnabled, 
upsertConfig, timeColumnName,
+        partitionUpsertMetadataManager, null);
+  }
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, Set<String> jsonIndexColumns,
+      ServerMetrics serverMetrics) {
+    return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
+        jsonIndexColumns, false, true, null, null, null, serverMetrics);
+  }
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, Set<String> jsonIndexColumns,
+      boolean aggregateMetrics, boolean nullHandlingEnabled, UpsertConfig 
upsertConfig, String timeColumnName,
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
ServerMetrics serverMetrics) {
     RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -68,13 +86,13 @@ public class MutableSegmentImplTestUtils {
     RealtimeSegmentConfig realtimeSegmentConfig =
         new 
RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
             
.setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
-            
.setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns)
+            
.setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns).setJsonIndexColumns(jsonIndexColumns)
             
.setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
             .setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
             .setMemoryManager(new 
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
             
.setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction)
             
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
-    return new MutableSegmentImpl(realtimeSegmentConfig, null);
+    return new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics);
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to