This is an automated email from the ASF dual-hosted git repository. jamesshao pushed a commit to branch upsert-refactor in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 39f5ba8d4770ae0a1b1c23ef02965ff430c7b78b Author: james Shao <[email protected]> AuthorDate: Thu Mar 19 14:34:46 2020 -0700 update to fix unit tests --- .../requesthandler/BaseBrokerRequestHandler.java | 2 +- .../broker/upsert/DefaultLowWaterMarkService.java | 11 +++--- .../broker/upsert/DefaultUpsertQueryRewriter.java | 6 ++-- .../broker/upsert/LowWaterMarkServiceProvider.java | 18 +++++++--- .../core/data/manager/BaseTableDataManager.java | 1 + .../core/data/manager/InstanceDataManager.java | 5 --- .../realtime/LLRealtimeSegmentDataManager.java | 5 ++- .../manager/realtime/RealtimeTableDataManager.java | 13 ++++---- .../data/manager/upsert/DataManagerCallback.java | 37 +++++++++++++++++++- .../upsert/DefaultIndexSegmentCallback.java | 8 ++--- .../DefaultTableDataManagerCallbackImpl.java | 14 ++++++-- .../data/manager/upsert/IndexSegmentCallback.java | 31 +++++++++++++++++ .../manager/upsert/TableDataManagerCallback.java | 39 ++++++++++++++++++++-- .../upsert/TableDataManagerCallbackProvider.java | 24 +++++++++---- .../segment/updater/DefaultWaterMarkManager.java | 4 ++- .../core/segment/updater/LowWaterMarkService.java | 39 +++++++++++++++------- ...UpsertQueryRewriter.java => QueryRewriter.java} | 20 +++++++---- .../segment/updater/UpsertComponentContainer.java | 34 +++++++++++++++++++ .../core/segment/updater/WaterMarkManager.java | 27 +++++++++++++-- .../SegmentGenerationWithNullValueVectorTest.java | 2 ++ .../pinot/query/executor/QueryExecutorTest.java | 2 ++ .../upsert/PollingBasedLowWaterMarkService.java | 6 ++-- .../broker/upsert/UpsertQueryRewriterImpl.java | 6 ++-- .../upsert/UpsertTableDataManagerCallbackImpl.java | 18 ++++++++-- .../segment/updater/UpsertWaterMarkManager.java | 26 ++++++++++++++- ...terImplTest.java => QueryRewriterImplTest.java} | 2 +- .../starter/helix/HelixInstanceDataManager.java | 2 +- .../upsert/UpsertComponentContainerProvider.java | 4 --- .../apache/pinot/server/api/BaseResourceTest.java | 4 +++ 29 files changed, 332 insertions(+), 78 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index cb17740..4b02cd4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -300,7 +300,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (shouldEnableLowWaterMarkRewrite(request)) { // Augment the realtime request with LowWaterMark constraints. - _lwmService.getQueryRewriter().rewriteQueryForUpsert(realtimeBrokerRequest, rawTableName); + _lwmService.getQueryRewriter().maybeRewriteQueryForUpsert(realtimeBrokerRequest, rawTableName); } // Calculate routing table for the query diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java index 42e1dcb..eb71890 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java @@ -22,13 +22,16 @@ import com.google.common.collect.ImmutableMap; import org.apache.helix.HelixDataAccessor; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.core.segment.updater.LowWaterMarkService; -import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; +import org.apache.pinot.core.segment.updater.QueryRewriter; import java.util.Map; +/** + * default class to handle any low watermark operation on pinot broker, mostly no-op + */ public class DefaultLowWaterMarkService implements LowWaterMarkService { - private UpsertQueryRewriter upsertQueryRewriter = new DefaultUpsertQueryRewriter(); + private QueryRewriter queryRewriter = new DefaultUpsertQueryRewriter(); @Override public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, @@ -49,7 +52,7 @@ public class DefaultLowWaterMarkService implements LowWaterMarkService { } @Override - public UpsertQueryRewriter getQueryRewriter() { - return upsertQueryRewriter; + public QueryRewriter getQueryRewriter() { + return queryRewriter; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java index d18a56f..97195a3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java @@ -1,7 +1,7 @@ package org.apache.pinot.broker.upsert; import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; +import org.apache.pinot.core.segment.updater.QueryRewriter; /** * Licensed to the Apache Software Foundation (ASF) under one @@ -21,10 +21,10 @@ import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; * specific language governing permissions and limitations * under the License. */ -public class DefaultUpsertQueryRewriter implements UpsertQueryRewriter { +public class DefaultUpsertQueryRewriter implements QueryRewriter { @Override - public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) { + public void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName) { // do nothing } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java index f5c06f3..56b3bff 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.broker.upsert; -import com.google.common.base.Preconditions; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.helix.HelixDataAccessor; @@ -29,21 +28,28 @@ import org.slf4j.LoggerFactory; import static org.apache.pinot.common.utils.CommonConstants.Broker.*; - +/** + * provider to initialize LowWaterMarkServer for pinot broker + */ public class LowWaterMarkServiceProvider { private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkServiceProvider.class); private LowWaterMarkService _instance; + /** + * create a new provider instance + * @param brokerConfig config for this provider to create the actual class reference, + * refer to {@value CommonConstants.Broker#CONFIG_OF_BROKER_LWMS_CLASS_NAME} + * @param dataAccessor helix data access to help low watermark service to find proper server cluster + * @param clusterName cluster name for the current pinot cluster + */ public LowWaterMarkServiceProvider(Configuration brokerConfig, HelixDataAccessor dataAccessor, String clusterName) { String className = brokerConfig.getString(CommonConstants.Broker.CONFIG_OF_BROKER_LWMS_CLASS_NAME, DefaultLowWaterMarkService.class.getName()); LOGGER.info("creating watermark manager with class {}", className); try { Class<LowWaterMarkService> comonentContainerClass = (Class<LowWaterMarkService>) Class.forName(className); - Preconditions.checkState(comonentContainerClass.isAssignableFrom(LowWaterMarkService.class), - "configured class not assignable from LowWaterMarkService class"); _instance = comonentContainerClass.newInstance(); _instance.init(dataAccessor, clusterName, brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS, @@ -57,6 +63,10 @@ public class LowWaterMarkServiceProvider { } } + /** + * fetch the current instance of low watermark service this provider created + * @return + */ public LowWaterMarkService getInstance() { return _instance; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index cb278b4..6c35b80 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -107,6 +107,7 @@ public abstract class BaseTableDataManager implements TableDataManager { * <p>The new segment is added with reference count of 1, so that is never removed until a drop command comes through. * * @param immutableSegment Immutable segment to add + * @param dataManagerCallback callback for performing any other necessary operation for other ingestion models */ @Override public void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index eba0689..8f51bd9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager; import java.io.File; import java.util.List; -import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -132,8 +131,4 @@ public interface InstanceDataManager { */ ZkHelixPropertyStore<ZNRecord> getPropertyStore(); -// /** -// * Return the mappings from partition -> low water marks of all the tables hosted in this server. -// */ -// Map<String, Map<Integer, Long>> getLowWaterMarks(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 4f3ae50..dc2f748 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1056,7 +1056,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics, - DataManagerCallback dataManagerCallback) { + DataManagerCallback dataManagerCallback) throws IOException { _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore(); _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata; _tableConfig = tableConfig; @@ -1219,6 +1219,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _indexLoadingConfig, _protocolHandler); + // init virtual columns + _dataManagerCallback.initVirtualColumns(); + segmentLogger .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index bc5316d..fd42627 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -39,7 +39,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.NamedThreadFactory; @@ -229,11 +228,13 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // of the index directory and loading segment from it LoaderUtils.reloadFailureRecovery(indexDir); + // tell callback to add segment + _tableDataManagerCallback.addSegment(_tableNameWithType, segmentName, tableConfig); + if (indexDir.exists() && (realtimeSegmentZKMetadata.getStatus() == Status.DONE)) { // Segment already exists on disk, and metadata has been committed. Treat it like an offline segment - - DataManagerCallback callback = _tableDataManagerCallback - .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false); + final DataManagerCallback callback = _tableDataManagerCallback + .getImmutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics); addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, callback), callback); } else { // Either we don't have the segment on disk or we have not committed in ZK. We should be starting the consumer @@ -267,7 +268,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics, - _tableDataManagerCallback.getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, true)); + _tableDataManagerCallback.getMutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics)); } _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName); _segmentDataManagerMap.put(segmentName, manager); @@ -321,7 +322,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { try { File indexDir = new File(_indexDir, segmentName); DataManagerCallback dataManagerCallback = _tableDataManagerCallback - .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false); + .getImmutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics); addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, dataManagerCallback), dataManagerCallback); } catch (Exception e) { throw new RuntimeException(e); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java index 466553e..91b08d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java @@ -18,26 +18,61 @@ */ package org.apache.pinot.core.data.manager.upsert; +import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import java.io.IOException; import java.util.List; +/** + * component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logics for + * other workflows other than regular append-mode ingestion. We are expected to provide appropriate link to class + * during run time + */ public interface DataManagerCallback { + /** + * create a callback component for {@link org.apache.pinot.core.indexsegment.IndexSegment} when + * {@link org.apache.pinot.core.data.manager.SegmentDataManager} create one. + * @return callback associated with the internal index segment this data manager holds + */ IndexSegmentCallback getIndexSegmentCallback(); + /** + * process the row after transformation in the ingestion process + * @param row the row of newly ingested and transformed data from upstream + * @param offset the offset of this particular row + */ void processTransformedRow(GenericRow row, long offset); + /** + * process the row after we have finished the index the current row + * @param row the row we just index to the current segment + * @param offset the offset associated with the index + */ void postIndexProcessing(GenericRow row, long offset); + /** + * callback for when a realtime segment data manager done with the current consumption loop for all data associated + * with it + */ void postConsumeLoop(); + /** + * initialize all virtual columns for the current data manager associated with upsert component (if necessary) + * @throws IOException + */ void initVirtualColumns() throws IOException; + /** + * update the data in the virtual columns from segment updater loop if necessary + * @param messages list of update log entries for the current datamanager + */ void updateVirtualColumns(List<UpdateLogEntry> messages); + /** + * callback when the associated data manager is destroyed by pinot server in call {@link SegmentDataManager#destroy()} + */ void destroy(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java index 0299cb7..a0d9acf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java @@ -27,6 +27,9 @@ import org.apache.pinot.spi.data.readers.GenericRow; import java.util.Map; +/** + * no-op callback for non-upsert table/pinot server instance + */ public class DefaultIndexSegmentCallback implements IndexSegmentCallback { public static final DefaultIndexSegmentCallback INSTANCE = new DefaultIndexSegmentCallback(); @@ -35,27 +38,22 @@ public class DefaultIndexSegmentCallback implements IndexSegmentCallback { @Override public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) { - // do nothing } @Override public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) { - // do noting } @Override public void postProcessRecords(GenericRow row, int docId) { - // do nothing } @Override public void initVirtualColumn() { - // do nothing } @Override public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) { - // do nothing } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java index 2e56455..a0f4398 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java @@ -22,6 +22,10 @@ import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.spi.data.Schema; +/** + * Class for no-op callback for pinot cluster/table that don't support upsert + * We will also use this for pinot tables that don't configured to use upsert semantic + */ public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCallback { private static final DefaultDataManagerCallbackImpl DEFAULT_DM_CALLBACK = DefaultDataManagerCallbackImpl.INSTANCE; @@ -35,8 +39,14 @@ public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCall } @Override - public DataManagerCallback getDataManagerCallback(String tableName, String segmentName, - Schema schema, ServerMetrics serverMetrics, boolean isMutable) { + public DataManagerCallback getMutableDataManagerCallback(String tableName, String segmentName, + Schema schema, ServerMetrics serverMetrics) { + return DEFAULT_DM_CALLBACK; + } + + @Override + public DataManagerCallback getImmutableDataManagerCallback(String tableName, String segmentName, + Schema schema, ServerMetrics serverMetrics) { return DEFAULT_DM_CALLBACK; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java index dd198f9..8b3a14d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java @@ -27,17 +27,48 @@ import org.apache.pinot.spi.data.readers.GenericRow; import java.io.IOException; import java.util.Map; +/** + * callback for handling any upsert-related operations in subclass of + * {@link org.apache.pinot.core.indexsegment.IndexSegment} if necessary + */ public interface IndexSegmentCallback { + /** + * initialize the callback from {@link org.apache.pinot.core.indexsegment.IndexSegment} + * @param segmentMetadata the metadata associated with the curreng segment + * @param virtualColumnIndexReader + */ void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader); + /** + * initialize offset column for in-memory access + * @param offsetColumnContainer the column that stores the offset data + */ void initOffsetColumn(ColumnIndexContainer offsetColumnContainer); + /** + * perform any operation from the callback for the given row after it has been processed and index + * @param row the current pinot row we just indexed into the current IndexSegment + * @param docId the docId of this record + */ void postProcessRecords(GenericRow row, int docId); + /** + * initialize set of upsert-related virtual columns if necessary + * @throws IOException + */ void initVirtualColumn() throws IOException; + /** + * update upsert-related virtual column from segment updater if necessary + * @param logEntries + */ void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries); + /** + * retrieve a information related to an upsert-enable segment virtual column for debug purpose + * @param offset the offset of the record we are trying to get the virtual columnn data for + * @return string representation of the virtual column data information + */ String getVirtualColumnInfo(long offset); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java index ec6a15f..7501961 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java @@ -20,16 +20,49 @@ package org.apache.pinot.core.data.manager.upsert; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.spi.data.Schema; +import java.io.File; + +/** + * component inject to {@link org.apache.pinot.core.data.manager.TableDataManager} for handling extra logics for + * other workflows other than regular append-mode ingestion. We are expected to provide appropriate link to class + * during run time + */ public interface TableDataManagerCallback { + /** + * initialize the callback object during {@link org.apache.pinot.core.data.manager.TableDataManager#init} + * ensure any internal component for this callback is properly created during the start time + */ void init(); - void addSegment(String tableName, String segmentName, TableConfig tableConfig); + /** + * callback to ensure other components related to the callback are added when + * {@link org.apache.pinot.core.data.manager.TableDataManager#addSegment(File, IndexLoadingConfig)} + * is executed + */ + void addSegment(String tableNameWithType, String segmentName, TableConfig tableConfig); + + /** + * return a callback object for an Immutable segment data manager callback component when a table create a new + * immutable {@link org.apache.pinot.core.data.manager.SegmentDataManager} + */ + DataManagerCallback getMutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema, + ServerMetrics serverMetrics); - DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema, - ServerMetrics serverMetrics, boolean isMutable); + /** + * return a callback object for a mutable segment data manager callback component when a table create a new + * immutable {@link org.apache.pinot.core.data.manager.SegmentDataManager} + */ + DataManagerCallback getImmutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema, + ServerMetrics serverMetrics); + /** + * create a no-op default callback for segmentDataManager that don't support upsert + * (eg, offline table, HLL consumers etc) + * @return a no-op default callback for data manager + */ DataManagerCallback getDefaultDataManagerCallback(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java index 2e868a2..1f08a90 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.data.manager.upsert; -import com.google.common.base.Preconditions; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -27,6 +26,10 @@ import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * class for creating appropriate {@link TableDataManagerCallback} depends on the config + * allow upsert-enabled pinot server to inject proper logics while keeping append-only pinot server keep the same + */ public class TableDataManagerCallbackProvider { private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerCallbackProvider.class); @@ -38,6 +41,12 @@ public class TableDataManagerCallbackProvider { public static final String DEFAULT_CALLBACK_CLASS_CONFIG_KEY = "append.tableDataManager.callback"; public static final String CALLBACK_CLASS_CONFIG_DEFAULT = DefaultTableDataManagerCallbackImpl.class.getName(); + /** + * initialize table data manager callback provider + * the most information config will be {@value UPSERT_CALLBACK_CLASS_CONFIG_KEY} for creating the proper + * callback injection for upsert pinot server + * @param configuration + */ public TableDataManagerCallbackProvider(Configuration configuration) { String appendClassName = configuration.getString(DEFAULT_CALLBACK_CLASS_CONFIG_KEY, CALLBACK_CLASS_CONFIG_DEFAULT); String upsertClassName = configuration.getString(UPSERT_CALLBACK_CLASS_CONFIG_KEY); @@ -47,8 +56,6 @@ public class TableDataManagerCallbackProvider { LOGGER.error("failed to load table data manager class {}", appendClassName, e); ExceptionUtils.rethrow(e); } - Preconditions.checkState(defaultTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class), - "configured class not assignable from Callback class", defaultTableDataManagerCallBackClass); if (StringUtils.isNotEmpty(upsertClassName)) { try { upsertTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(upsertClassName); @@ -56,11 +63,13 @@ public class TableDataManagerCallbackProvider { LOGGER.error("failed to load table data manager class {}", upsertClassName); ExceptionUtils.rethrow(e); } - Preconditions.checkState(upsertTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class), - "configured class not assignable from Callback class"); } } + /** + * create a proper callback for the table, depends on whether the table is configured for upsert or not + * @param tableDataManagerConfig the config for the table + */ public TableDataManagerCallback getTableDataManagerCallback(TableDataManagerConfig tableDataManagerConfig) { if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) { return getUpsertTableDataManagerCallback(); @@ -69,7 +78,7 @@ public class TableDataManagerCallbackProvider { } } - public TableDataManagerCallback getUpsertTableDataManagerCallback() { + private TableDataManagerCallback getUpsertTableDataManagerCallback() { try { return upsertTableDataManagerCallBackClass.newInstance(); } catch (Exception ex) { @@ -79,6 +88,9 @@ public class TableDataManagerCallbackProvider { return null; } + /** + * create a tabledatamanager for a non-upsert enabled tables, ensure to use the original pinot workflow + */ public TableDataManagerCallback getDefaultTableDataManagerCallback() { try { return defaultTableDataManagerCallBackClass.newInstance(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java index b45060c..7c53467 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java @@ -24,13 +24,15 @@ import org.apache.pinot.grigio.common.metrics.GrigioMetrics; import java.util.Map; +/** + * default no-op watermark manager for pinot + */ public class DefaultWaterMarkManager implements WaterMarkManager { private static final Map<String, Map<Integer, Long>> DEFAULT_MAP = ImmutableMap.of(); @Override public void init(Configuration config, GrigioMetrics metrics) { - } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java index 0c6756c..e9cc5f9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java @@ -26,19 +26,34 @@ import java.util.Map; /** * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of * an input table. + * It runs on pinot broker to fetch lwm information from pinot server periodically + * and use that to rewrite pinot query periodically */ public interface LowWaterMarkService { - void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort); - - // Return the low water mark mapping from partition id to the corresponding low water mark of a given table. - Map<Integer, Long> getLowWaterMarks(String tableName); - - // Shutdown the service. - void shutDown(); - - // start - void start(BrokerMetrics brokerMetrics); - - UpsertQueryRewriter getQueryRewriter(); + void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort); + + /** + * the low water mark mapping from partition id to the corresponding low water mark of a given table. + * @param tableNameWithType + * @return map of partition to lowWatermark + */ + Map<Integer, Long> getLowWaterMarks(String tableNameWithType); + + /** + * shutdown low water mark service and its background threads (if any) + */ + void shutDown(); + + /** + * start the current low watermark service + * @param brokerMetrics pinot broker metrics for lwm service to report its status to + */ + void start(BrokerMetrics brokerMetrics); + + /** + * get a queryrewriter to ensure that we can rewrite a query if the target table is upsert-enabled table + * @return + */ + QueryRewriter getQueryRewriter(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java similarity index 71% rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java index 64a64d0..40abd9a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java @@ -1,7 +1,3 @@ -package org.apache.pinot.core.segment.updater; - -import org.apache.pinot.common.request.BrokerRequest; - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,8 +16,20 @@ import org.apache.pinot.common.request.BrokerRequest; * specific language governing permissions and limitations * under the License. */ -public interface UpsertQueryRewriter { +package org.apache.pinot.core.segment.updater; + +import org.apache.pinot.common.request.BrokerRequest; + +/** + * class that rewrite pinot broker sql for upsert or other purpose + */ +public interface QueryRewriter { - void rewriteQueryForUpsert(BrokerRequest request, String rawTableName); + /** + * rewrite the query for pinot upsert table if necessary + * @param request the pinot sql request that pinot broker requests + * @param rawTableName the raw + */ + void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java index eb64285..f1312e7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java @@ -22,21 +22,55 @@ import com.yammer.metrics.core.MetricsRegistry; import org.apache.commons.configuration.Configuration; import org.apache.helix.HelixManager; +/** + * contains all components related to upsert in pinot server + */ public interface UpsertComponentContainer { + /** + * register pinot upsert component metrics to the given registry + * @param prefix the prefix of all metrics + * @param registry the registry we are going to register the metrics to + */ void registerMetrics(String prefix, MetricsRegistry registry); + /** + * initialize the upsert comonent container with necessary config and information + * @param config the configuration for this upsert component + * @param helixManager helix manager for the current pinot server helix state + * @param clusterName helix cluster name for the current pinot cluster + * @param instanceName the name of current pinot instance in this cluster + */ void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName); + /** + * start any necessary background processing for this upsert component + */ void startBackgroundThread(); + /** + * stop any necessary background processing for this upsert component + */ void stopBackgroundThread(); + /** + * shutdown and clean up any state for this upsert component + */ void shutdown(); + /** + * return a segment deletion callback component that should be invoked when pinot server removed a segment + * from its internal storage (to DROPPED state) + */ SegmentDeletionHandler getSegmentDeletionHandler(); + /** + * return the current watermark manager for this server + */ WaterMarkManager getWatermarkManager(); + /** + * check if upsert is enable for the current pinot server + */ boolean isUpsertEnabled(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java index acc8479..63ba5c9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java @@ -19,16 +19,37 @@ package org.apache.pinot.core.segment.updater; import org.apache.commons.configuration.Configuration; -import org.apache.pinot.grigio.common.metrics.GrigioMeter; import org.apache.pinot.grigio.common.metrics.GrigioMetrics; import java.util.Map; +/** + * class run on pinot server to keep track of the low-water-mark of each upsert table + * organized by partition + */ public interface WaterMarkManager { + /** + * initialize watermark manager + * @param config the configuration subset for waterMarkManager + * @param metrics the metrics for watermark manager + */ void init(Configuration config, GrigioMetrics metrics); + /** + * the highest epoch for each partition of each table in this pinot server + * @return mapping of {pinot_table_name: {partition_id: high_water_mark}} + * example as { + * "table1_REALTIME" : { + * "0" : 1400982, + * "1" : 1400982, + * "2" : 1400982, + * "3" : 1400982 + * }, + * "table2_REALTIME" : { + * "0" : 1401008, + * "1" : 1401008 + * } + */ Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap(); - - } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java index 4d1d469..a56f897 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java @@ -40,6 +40,7 @@ import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl; @@ -137,6 +138,7 @@ public class SegmentGenerationWithNullValueVectorTest { when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE"); when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME); when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath()); + TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class)); @SuppressWarnings("unchecked") TableDataManager tableDataManager = TableDataManagerProvider .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java index 203d2f6..2eda11f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl; @@ -98,6 +99,7 @@ public class QueryExecutorTest { when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE"); when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME); when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath()); + TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class)); @SuppressWarnings("unchecked") TableDataManager tableDataManager = TableDataManagerProvider .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java index 74e4b41..c92d53e 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java @@ -30,7 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo; import org.apache.pinot.core.segment.updater.LowWaterMarkService; -import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; +import org.apache.pinot.core.segment.updater.QueryRewriter; import org.glassfish.jersey.client.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +67,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService { private int _serverPort; private boolean _shuttingDown; private BrokerMetrics _brokerMetrics; - private UpsertQueryRewriter _queryRewriter; + private QueryRewriter _queryRewriter; @Override public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) { @@ -104,7 +104,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService { } @Override - public UpsertQueryRewriter getQueryRewriter() { + public QueryRewriter getQueryRewriter() { return _queryRewriter; } diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java index 90fce7c..a9cd1f0 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java @@ -25,7 +25,7 @@ import org.apache.pinot.common.request.FilterQuery; import org.apache.pinot.common.request.FilterQueryMap; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.core.segment.updater.LowWaterMarkService; -import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; +import org.apache.pinot.core.segment.updater.QueryRewriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -public class UpsertQueryRewriterImpl implements UpsertQueryRewriter { +public class UpsertQueryRewriterImpl implements QueryRewriter { private static final Logger LOGGER = LoggerFactory.getLogger(UpsertQueryRewriterImpl.class); protected final LowWaterMarkService _lwmService; @@ -48,7 +48,7 @@ public class UpsertQueryRewriterImpl implements UpsertQueryRewriter { } @Override - public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) { + public void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName) { final String realtimeTableName = TableNameBuilder.ensureTableNameWithType(rawTableName, CommonConstants.Helix.TableType.REALTIME); Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName); diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java index dc329e4..efbbf00 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.data.manager.upsert; - import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pinot.common.config.TableConfig; @@ -30,6 +29,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +/** + * class for handle all upsert related operation for interacting with segments for a given table at + * {@link org.apache.pinot.core.data.manager.TableDataManager} + */ public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallback { private static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableDataManagerCallbackImpl.class); @@ -51,7 +54,18 @@ public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallb } @Override - public DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema, + public DataManagerCallback getMutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema, + ServerMetrics serverMetrics) { + return getDataManagerCallback(tableNameWithType, segmentName, schema, serverMetrics, true); + } + + @Override + public DataManagerCallback getImmutableDataManagerCallback(String tableNameWithType, String segmentName, + Schema schema, ServerMetrics serverMetrics) { + return getDataManagerCallback(tableNameWithType, segmentName, schema, serverMetrics, false); + } + + private DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema, ServerMetrics serverMetrics, boolean isMutable) { return new UpsertDataManagerCallbackImpl(tableName, segmentName, schema, serverMetrics, isMutable); } diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java index 266f25d2..4b3d861 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java @@ -31,6 +31,14 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +/** + * watermark manager for upsert component to collect the low-water-mark information of each tables in the current + * pinot server + * watermark is defined as largest version of each partition (segment update event topic partition) for each table + * so it stores the data in map of {table_name: {partition_id: highest_water_mark}} + * then {@link LowWaterMarkService} will ingest those information from pinot server and calculate the lowest of these + * watermark and use it in query to send to server + */ public class UpsertWaterMarkManager implements WaterMarkManager { private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>(); @@ -55,7 +63,12 @@ public class UpsertWaterMarkManager implements WaterMarkManager { return _instance; } - // TODO(tingchen) Look into the case where Segment Update Messages might arrive before the corresponding physical data. + /** + * process a event message and update the current watermark information for this manager + * @param table + * @param segment + * @param logEntry the message containing the new watermark information + */ public void processMessage(String table, String segment, UpdateLogEntry logEntry) { if (logEntry == null) { return; @@ -65,6 +78,12 @@ public class UpsertWaterMarkManager implements WaterMarkManager { processVersionUpdate(table, partition, version); } + /** + * update the high watermark information associated with the given table/partition + * @param table + * @param partition + * @param version + */ public void processVersionUpdate(String table, int partition, long version) { Preconditions.checkState(partition >= 0, "logEntry has invalid version {} for table {}", version, table); @@ -76,6 +95,11 @@ public class UpsertWaterMarkManager implements WaterMarkManager { } } + /** + * return the highest watermark for each partition of the given table + * @param tableName + * @return + */ public Map<Integer, Long> getHighWaterMarkForTable(String tableName) { return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap.getOrDefault(tableName, ImmutableMap.of())); } diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java similarity index 99% rename from pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java index ac6c38d..5e6b703 100644 --- a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java +++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java @@ -33,7 +33,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -public class UpsertQueryRewriterImplTest { +public class QueryRewriterImplTest { private LowWaterMarkService _lwms; private UpsertQueryRewriterImpl rewriter; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 710e22a..b06000f 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -221,7 +221,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { final TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); final DataManagerCallback dataManagerCallback = tableDataManager.getTableDataManagerCallback() - .getDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics, false); + .getImmutableDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics); // Load from index directory ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java index 7ae27a6..ab76e36 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java @@ -18,10 +18,8 @@ */ package org.apache.pinot.server.upsert; -import com.google.common.base.Preconditions; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pinot.core.segment.updater.UpsertComponentContainer; -import org.apache.pinot.core.segment.updater.WaterMarkManager; import org.apache.pinot.server.conf.ServerConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +35,6 @@ public class UpsertComponentContainerProvider { LOGGER.info("creating watermark manager with class {}", className); try { Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className); - Preconditions.checkState(comonentContainerClass.isAssignableFrom(WaterMarkManager.class), - "configured class not assignable from Callback class"); _instance = comonentContainerClass.newInstance(); } catch (Exception e) { LOGGER.error("failed to load watermark manager class", className, e); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index 60702c4..aec1510 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl; @@ -41,6 +42,7 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.updater.DefaultWaterMarkManager; import org.apache.pinot.segments.v1.creator.SegmentTestUtils; import org.apache.pinot.server.starter.ServerInstance; import org.apache.pinot.server.starter.helix.AdminApiApplication; @@ -83,6 +85,7 @@ public abstract class BaseResourceTest { // Mock the server instance ServerInstance serverInstance = mock(ServerInstance.class); + when(serverInstance.getWatermarkManager()).thenReturn(new DefaultWaterMarkManager()); when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager); // Add the default table and segment @@ -133,6 +136,7 @@ public abstract class BaseResourceTest { when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE"); when(tableDataManagerConfig.getTableName()).thenReturn(tableName); when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath()); + TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class)); TableDataManager tableDataManager = TableDataManagerProvider .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
