deemoliu commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r655708998



##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handler for partial-upsert.
+ */
+public class PartialUpsertHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialUpsertHandler.class);
+
+  private final Map<String, PartialUpsertMerger> _mergers = new HashMap<>();

Review comment:
       gotcha, thanks for reviewing, will refactor this part.

##########
File path: 
pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.Quickstart.Color;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+import static org.apache.pinot.tools.Quickstart.printStatus;
+
+
+public class PartialUpsertQuickStart {
+  private StreamDataServerStartable _kafkaStarter;
+
+  public static void main(String[] args)
+      throws Exception {
+    PluginManager.get().init();
+    new PartialUpsertQuickStart().execute();
+  }
+
+  // Todo: add a quick start demo
+  public void execute()
+      throws Exception {
+    File quickstartTmpDir = new File(FileUtils.getTempDirectory(), 
String.valueOf(System.currentTimeMillis()));
+    File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
+    File dataDir = new File(bootstrapTableDir, "data");
+    Preconditions.checkState(dataDir.mkdirs());
+
+    File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
+    File tableConfigFile = new File(bootstrapTableDir, 
"meetupRsvp_realtime_table_config.json");
+
+    ClassLoader classLoader = Quickstart.class.getClassLoader();
+    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, schemaFile);
+    resource =
+        
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, tableConfigFile);
+
+    QuickstartTableRequest request = new 
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
+    final QuickstartRunner runner = new 
QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
+
+    printStatus(Color.CYAN, "***** Starting Kafka *****");
+    final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
+    try {
+      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+          KafkaStarterUtils.getDefaultKafkaConfiguration());
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    _kafkaStarter.start();
+    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+    printStatus(Color.CYAN, "***** Starting  meetup data stream and publishing 
to Kafka *****");
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
+    meetupRSVPProvider.run();
+    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
+    runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
+        runner.stop();
+        meetupRSVPProvider.stopPublishing();
+        _kafkaStarter.stop();
+        ZkStarter.stopLocalZkServer(zookeeperInstance);
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
+    runner.bootstrapTable();
+    printStatus(Color.CYAN, "***** Waiting for 15 seconds for a few events to 
get populated *****");
+    Thread.sleep(15000);
+
+    printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
+
+    String q1 = "select event_id, count(*), sum(rsvp_count) from meetupRsvp 
group by event_id order by sum(rsvp_count) desc limit 10";
+    printStatus(Color.YELLOW, "Total number of documents, total number of 
rsvp_counts per event_id in the table");

Review comment:
       gotcha, added comments.

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +63,170 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.

Review comment:
       the index here means PK to recordLocation mapping. cc'ed @Jackie-Jiang i 
re-phrased this comment a bit. 

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +63,170 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> 
getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
-  }
-
-  // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
   /**
-   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
+   * Initializes the upsert metadata for the given immutable segment.
    */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+  public void addSegment(IndexSegment segment, Iterator<RecordInfo> 
recordInfoIterator) {
+    String segmentName = segment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
+    assert validDocIds != null;
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same 
segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie 
to keep the newer record. Note that the
-            //      record info iterator will return records with incremental 
doc ids.
-            //   2. The current record location is pointing to the old segment 
being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed 
segment. In this case, we want to update
-            //      the record location when there is a tie because the record 
locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, 
do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer 
record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (segment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) 
{
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or 
the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the 
segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() 
|| (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && 
LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && 
LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName
-                    
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced

Review comment:
       @chenboat This case happens when reloading a completed segment. The old 
segment is being replaced.
   cc'ed @Jackie-Jiang 
   

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+public interface PartialUpsertMerger {
+  /**
+   * Handle partial upsert merge.
+   *
+   * @param previousValue the last derived full record during ingestion.
+   * @param currentValue the new consumed record.

Review comment:
       thanks for pointing this out. The javadoc is not correct. I updated the 
javadoc for this function.

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -470,28 +466,32 @@ public void addExtraColumns(Schema newSchema) {
   @Override
   public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
       throws IOException {
-    // Update dictionary first
-    updateDictionary(row);
-
-    // If metrics aggregation is enabled and if the dimension values were 
already seen, this will return existing docId,
-    // else this will return a new docId.
-    int docId = getOrCreateDocId();
-
     boolean canTakeMore;
-    if (docId == _numDocsIndexed) {
-      // New row
+    if (isUpsertEnabled()) {
+      row = handleUpsert(row, _numDocsIndexed);
+
+      updateDictionary(row);
       addNewRow(row);
       // Update number of documents indexed at last to make the latest row 
queryable
       canTakeMore = _numDocsIndexed++ < _capacity;
-
-      if (isUpsertEnabled()) {
-        handleUpsert(row, docId);
-      }
     } else {
-      Preconditions.checkArgument(!isUpsertEnabled(), "metrics aggregation 
cannot be used with upsert");
-      assert _aggregateMetrics;
-      aggregateMetrics(row, docId);
-      canTakeMore = true;
+      // Update dictionary first
+      updateDictionary(row);
+
+      // If metrics aggregation is enabled and if the dimension values were 
already seen, this will return existing
+      // docId, else this will return a new docId.
+      int docId = getOrCreateDocId();
+
+      if (docId == _numDocsIndexed) {
+        // New row
+        addNewRow(row);
+        // Update number of documents indexed at last to make the latest row 
queryable
+        canTakeMore = _numDocsIndexed++ < _capacity;
+      } else {
+        assert _aggregateMetrics;

Review comment:
       gotcha, added a preCondition check here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to