deemoliu commented on code in PR #11584: URL: https://github.com/apache/pinot/pull/11584#discussion_r1325212461
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java: ########## @@ -41,16 +44,33 @@ public class PartialUpsertHandler { private final PartialUpsertMerger _defaultPartialUpsertMerger; private final List<String> _comparisonColumns; private final List<String> _primaryKeyColumns; + private final PartialUpsertRowMergeEvaluator _rowMerger; + private LazyRow _reusePreviousLazyRow; + private LazyRow _reuseNewLazyRow; + private Map<String, Object> _reuseRowMergerResult; - public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies, - UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); + public PartialUpsertHandler(Schema schema, UpsertConfig upsertConfig, List<String> comparisonColumns) { + _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { + for (Map.Entry<String, UpsertConfig.Strategy> entry : upsertConfig.getPartialUpsertStrategies().entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); } + + String rowMergerCustomImplementation = upsertConfig.getRowMergerCustomImplementation(); + if (rowMergerCustomImplementation != null && !rowMergerCustomImplementation.equals("")) { + try { + _rowMerger = PartialUpsertRowMergeEvaluatorFactory.getInstance(rowMergerCustomImplementation); + } catch (Exception e) { + throw new RuntimeException(e); + } + _reusePreviousLazyRow = new LazyRow(); + _reuseNewLazyRow = new LazyRow(); + _reuseRowMergerResult = new HashMap<>(); Review Comment: concurrent hashmap? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/LazyRow.java: ########## @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.readers; + +import java.io.IOException; +import java.util.HashMap; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * A wrapper class to read column values of a row. The wrapper can either be over a {@link IndexSegment} and docId or + * over a {@link GenericRow}<br> + * The advantage of having wrapper over segment and docId is column values are read only when + * {@link LazyRow#getValue(String)} is invoked. + * This is useful to reduce the disk reads incurred due to loading the previous row during merge step. + * There isn't any advantage to have a LazyRow wrap a GenericRow but has been kept for syntactic sugar. + */ +public class LazyRow { + private IndexSegment _segment; + private int _docId; + private GenericRow _row; + + private HashMap<String, Object> _fieldToValueMap = new HashMap<>(); Review Comment: concurrentHashMap ? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertRowMergeEvaluatorFactory.java: ########## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PartialUpsertRowMergeEvaluatorFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartialUpsertRowMergeEvaluatorFactory.class); + + private PartialUpsertRowMergeEvaluatorFactory() { + } + + public static PartialUpsertRowMergeEvaluator getInstance(String implementationClassName) { + LOGGER.info("Creating BiFunctionEvaluator for class {}", implementationClassName); + if (StringUtils.isBlank(implementationClassName)) { + throw new IllegalArgumentException("Empty implementationClassName"); + } + try { + Class<?> aClass = Class.forName(implementationClassName); + if (!PartialUpsertRowMergeEvaluator.class.isAssignableFrom(aClass)) { + throw new IllegalArgumentException( + "The provided class is not an implementation of BiFunctionEvaluator"); Review Comment: BiFunctionEvaluator --> PartialUpsertRowMergeEvaluator ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java: ########## @@ -145,6 +137,32 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD } } + /** + * Reuses a PartialUpsertHandler if there is no custom row merger otherwise creates a new instance to avoid + * concurrent modification of state(LazyRow) between segments of the table + * @return partial upsert handler to merge previous and new row + */ + protected PartialUpsertHandler getPartialUpsertHandler() { + UpsertConfig upsertConfig = _tableConfig.getUpsertConfig(); + if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { + Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + Preconditions.checkArgument(partialUpsertStrategies != null, + "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", + _tableNameWithType); + if (upsertConfig.getRowMergerCustomImplementation() != null) { + // In case of custom merger, the PUH is dependent on LazyRow object to be reused. LazyRow is stateful and + // cause concurrent modification issues, hence a new PUH is created per partition + return new PartialUpsertHandler(_schema, upsertConfig, _comparisonColumns); Review Comment: Got it. currently the partial upsert handler are initialized in the upsertTableManager and shared by partitionUpsertManager. After lazyRow passed in the partialUpsertHandler it become stateful, and multiple partitions can access the partial upsert handler. this leads to concurrent modification issue. So we have to avoid sharing the partial upsert handler among partitions. @Jackie-Jiang do you see any improvement on this approach? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertRowMergeEvaluatorFactory.java: ########## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PartialUpsertRowMergeEvaluatorFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartialUpsertRowMergeEvaluatorFactory.class); + + private PartialUpsertRowMergeEvaluatorFactory() { + } + + public static PartialUpsertRowMergeEvaluator getInstance(String implementationClassName) { + LOGGER.info("Creating BiFunctionEvaluator for class {}", implementationClassName); Review Comment: BiFunctionEvaluator --> PartialUpsertRowMergeEvaluator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org