This is an automated email from the ASF dual-hosted git repository. jamesshao pushed a commit to branch upsert-refactor in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 022253fb7fdadc256851f0cd74c8410a16962657 Author: Uber Jenkins <[email protected]> AuthorDate: Thu Feb 13 23:55:51 2020 +0000 fix upsert data issue Summary: fix upsert data issue due to default set creation is not thread safe also refactor some code, abstact a new class and added documentation and tests Reviewers: bzzhang, tingchen, #streaming_pinot Reviewed By: bzzhang, #streaming_pinot Differential Revision: https://code.uberinternal.com/D3915319 --- .../VirtualColumnLongValueReaderWriter.java | 7 +- .../SegmentUpdateLogStorageProvider.java | 9 +- .../storageProvider/UpdateLogStorageExplorer.java | 25 ++- .../UpsertImmutableIndexSegmentCallback.java | 44 +++-- .../pinot/core/segment/updater/SegmentUpdater.java | 74 ++++----- .../updater/SegmentUpdaterDataManagerHolder.java | 131 +++++++++++++++ .../SegmentUpdaterDataManagerHolderTest.java | 184 +++++++++++++++++++++ .../server/api/resources/UpsertDebugResource.java | 17 +- 8 files changed, 421 insertions(+), 70 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java index 78f3bda..dea717c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java @@ -64,7 +64,12 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu @Override public long getLong(int row) { - return _values[row]; + if (row >= 0 && row < _totalDocSize) { + return _values[row]; + } else { + throw new RuntimeException(String.format("trying to fetch row %d while we only have total row count %d", row, + _totalDocSize)); + } } @Override diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java index e7b035f..fa045d1 100644 --- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java +++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java @@ -32,6 +32,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * provide the storage abstraction of storing upsert update event logs to a local disk so we can reload it @@ -44,6 +45,7 @@ public class SegmentUpdateLogStorageProvider { protected final File _file; @VisibleForTesting protected final FileOutputStream _outputStream; + private AtomicInteger messageCountInFile = new AtomicInteger(0); public SegmentUpdateLogStorageProvider(File file) throws IOException { @@ -60,6 +62,7 @@ public class SegmentUpdateLogStorageProvider { ByteBuffer buffer = ByteBuffer.allocate(fileLength); readFullyFromBeginning(_file, buffer); int messageCount = fileLength / UpdateLogEntry.SIZE; + LOGGER.info("read {} messages from file {}", messageCount, _file.getName()); return new UpdateLogEntrySet(buffer, messageCount); } else { return UpdateLogEntrySet.getEmptySet(); @@ -74,7 +77,8 @@ public class SegmentUpdateLogStorageProvider { buffer.flip(); _outputStream.write(buffer.array()); _outputStream.flush(); - + messageCountInFile.getAndAdd(messages.size()); + LOGGER.debug("file {} message count {}", _file.getName(), messageCountInFile.get()); } public synchronized void destroy() throws IOException { @@ -102,6 +106,7 @@ public class SegmentUpdateLogStorageProvider { segmentUpdateFile.length(), newSize); channel.truncate(newSize); channel.force(false); + messageCountInFile.set(Math.toIntExact(newSize / UpdateLogEntry.SIZE)); } } @@ -116,7 +121,7 @@ public class SegmentUpdateLogStorageProvider { position += byteRead; } while (byteRead != -1 && buffer.hasRemaining()); buffer.flip(); - LOGGER.info("read all data from segment update file {} to buffer in {} ms", segmentUpdateFile.getName(), + LOGGER.info("read {} bytes from segment update file {} to buffer in {} ms", position, segmentUpdateFile.getName(), System.currentTimeMillis() - start); } diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java index 0e38deb..a2b53ee 100644 --- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java +++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java @@ -27,6 +27,7 @@ import org.apache.commons.configuration.PropertiesConfiguration; import java.io.IOException; import java.util.Collection; import java.util.Scanner; +import java.util.concurrent.atomic.AtomicInteger; /** * command line tools for debug pinot server by allowing us to interatively explore the update log data in pinot server/kc @@ -52,24 +53,38 @@ public class UpdateLogStorageExplorer { String[] inputSplits = input.split(" "); Preconditions.checkState(inputSplits.length == 2, "expect input data to be 'tableName segmentName'"); String tableName = inputSplits[0]; + if (!tableName.endsWith("_REALTIME")) { + tableName = tableName + "_REALTIME"; + } String segmentName = inputSplits[1]; provider.loadTable(tableName); UpdateLogEntrySet updateLogEntrySet = provider.getAllMessages(tableName, segmentName); - Multimap<Long, UpdateLogEntry> map = ArrayListMultimap.create(); + Multimap<Long, UpdateLogAndPos> map = ArrayListMultimap.create(); System.out.println("update log size: " + updateLogEntrySet.size()); + AtomicInteger pos = new AtomicInteger(0); updateLogEntrySet.forEach(u -> { - map.put(u.getOffset(), u); + map.put(u.getOffset(), new UpdateLogAndPos(u, pos.getAndIncrement())); }); while (true) { System.out.println("input the offset"); long offset = reader.nextLong(); - Collection<UpdateLogEntry> result = map.get(offset); + Collection<UpdateLogAndPos> result = map.get(offset); System.out.println("associated update logs size: " + result.size()); - for (UpdateLogEntry entry: result) { - System.out.println("content: " + entry.toString()); + for (UpdateLogAndPos entry: result) { + System.out.println("content: " + entry.logEntry.toString() + " pos " + entry.pos); } } } + + static class UpdateLogAndPos { + public UpdateLogEntry logEntry; + public int pos; + + public UpdateLogAndPos(UpdateLogEntry entry, int pos) { + this.logEntry = entry; + this.pos = pos; + } + } } diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java index b7fcdb8..d0667b8 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java @@ -130,9 +130,9 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback throw new RuntimeException("unexpected forward reader type for kafka offset column " + reader.getClass()); } LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start); + LOGGER.info("immutable segment {} built offset map with minOffset {} and maxOffset {}", _segmentName, minOffset, maxOffset); } - @Override public void postProcessRecords(GenericRow row, int docId) { // do nothing } @@ -150,7 +150,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start); start = System.currentTimeMillis(); - final long maxOffset = _totalDoc + _minSourceOffset; + final long maxOffset = _minSourceOffset + _sourceOffsetToDocIdArray.length; int unmatchedLogEntryCount = 0; try { Map<Integer, Long> partitionToHighestWatermark = new HashMap<>(); @@ -193,13 +193,19 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback @Override public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) { for (UpdateLogEntry logEntry: logEntries) { - boolean updated = false; - int docId = getDocIdFromSourceOffset(logEntry.getOffset()); - for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) { - updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated; - } - if (updated) { - _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry); + try { + boolean updated = false; + int docId = getDocIdFromSourceOffset(logEntry.getOffset()); + if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) { + for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) { + updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated; + } + if (updated) { + _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry); + } + } + } catch (Exception ex) { + LOGGER.warn("failed to update virtual column, skipping the current record {}", logEntries.toString()); } } } @@ -207,11 +213,15 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback @Override public String getVirtualColumnInfo(long offset) { int docId = getDocIdFromSourceOffset(offset); - StringBuilder result = new StringBuilder("matched: "); - for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) { - result.append(readerWriter.getInt(docId)).append("; "); + if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) { + StringBuilder result = new StringBuilder("matched: "); + for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) { + result.append(readerWriter.getLong(docId)).append("; "); + } + return result.toString(); + } else { + return "cannot found doc matching offset"; } - return result.toString(); } /** @@ -222,14 +232,14 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback */ private int getDocIdFromSourceOffset(long offset) throws RuntimeException { if (offset < _minSourceOffset || offset - _minSourceOffset >= _sourceOffsetToDocIdArray.length) { - LOGGER.error("offset {} is outside range for current segment {} start offset {} size {}", + LOGGER.warn("offset {} is outside of range for current segment {} start offset {} size {}", offset, _segmentName, _minSourceOffset, _sourceOffsetToDocIdArray.length); - throw new RuntimeException("offset outside range"); + return DEFAULT_DOC_ID_FOR_MISSING_ENTRY; } else { int position = Math.toIntExact(offset - _minSourceOffset); if (_sourceOffsetToDocIdArray[position] == DEFAULT_DOC_ID_FOR_MISSING_ENTRY) { - LOGGER.error("no docId associated with offset {} for segment {}", offset, _segmentName); - throw new RuntimeException("docId not found"); + LOGGER.warn("no docId associated with offset {} for segment {}", offset, _segmentName); + return DEFAULT_DOC_ID_FOR_MISSING_ENTRY; } else { return _sourceOffsetToDocIdArray[position]; } diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java index 5f0aa43..f380afa 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java @@ -21,7 +21,6 @@ package org.apache.pinot.core.segment.updater; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; -import io.netty.util.internal.ConcurrentSet; import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.utils.CommonConstants; @@ -36,6 +35,7 @@ import org.apache.pinot.grigio.common.rpcQueue.QueueConsumerRecord; import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry; import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider; import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManager; +import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogTableRetentionManager; import org.apache.pinot.grigio.common.utils.CommonUtils; import org.apache.pinot.grigio.servers.SegmentUpdaterProvider; import org.slf4j.Logger; @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,7 +74,7 @@ public class SegmentUpdater implements SegmentDeletionListener { private final String _topicPrefix; private final ExecutorService _ingestionExecutorService; private final QueueConsumer _consumer; - private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>(); + private final SegmentUpdaterDataManagerHolder _dataManagersHolder = new SegmentUpdaterDataManagerHolder(); private final Map<String, Map<Integer, Long>> _tablePartitionCreationTime = new ConcurrentHashMap<>(); private final UpdateLogStorageProvider _updateLogStorageProvider; private final UpdateLogRetentionManager _retentionManager; @@ -105,7 +104,7 @@ public class SegmentUpdater implements SegmentDeletionListener { public void start() { - String listOfTables = Joiner.on(",").join(_tableSegmentMap.keySet()); + String listOfTables = Joiner.on(",").join(_dataManagersHolder.getAllTables()); LOGGER.info("starting segment updater main loop with the following table in server: {}", listOfTables); _ingestionExecutorService.submit(this::updateLoop); } @@ -162,10 +161,9 @@ public class SegmentUpdater implements SegmentDeletionListener { for (Map.Entry<String, TableUpdateLogs> entry : tableSegmentToUpdateLogs.entrySet()) { String tableName = TableNameBuilder.ensureTableNameWithType(entry.getKey(), CommonConstants.Helix.TableType.REALTIME); int tableMessageCount = 0; - if (_tableSegmentMap.containsKey(tableName)) { - final Map<String, Set<DataManagerCallback>> segmentManagersMap = _tableSegmentMap.get(tableName); + if (_dataManagersHolder.hasTable(tableName)) { final TableUpdateLogs segment2UpdateLogsMap = entry.getValue(); - updateSegmentVirtualColumns(tableName, segmentManagersMap, segment2UpdateLogsMap, timeToStoreUpdateLogs); + updateSegmentVirtualColumns(tableName, segment2UpdateLogsMap, timeToStoreUpdateLogs); } else { LOGGER.warn("got messages for table {} not in this server", tableName); } @@ -202,12 +200,11 @@ public class SegmentUpdater implements SegmentDeletionListener { /** * Update the virtual columns of affected segments of a table. */ - private void updateSegmentVirtualColumns(String tableName, Map<String, Set<DataManagerCallback>> segmentManagersMap, - TableUpdateLogs segment2UpdateLogsMap, AtomicLong timeToStoreUpdateLogs) throws IOException{ + private void updateSegmentVirtualColumns(String tableName, TableUpdateLogs segment2UpdateLogsMap, + AtomicLong timeToStoreUpdateLogs) throws IOException{ for (Map.Entry<String, List<UpdateLogEntry>> segmentEntry : segment2UpdateLogsMap.getSegments2UpdateLog().entrySet()) { final String segmentNameStr = segmentEntry.getKey(); updateVirtualColumn(tableName, segmentNameStr, - segmentManagersMap.computeIfAbsent(segmentNameStr, sn -> new ConcurrentSet<>()), segment2UpdateLogsMap.get(segmentNameStr), timeToStoreUpdateLogs); } } @@ -217,19 +214,21 @@ public class SegmentUpdater implements SegmentDeletionListener { * from consuming to online (mutable segment to immutable segment). In most of cases we expect only one segment manager * in this set of UpsertSegmentDataManager */ - private void updateVirtualColumn(String table, String segment, Set<DataManagerCallback> segmentDataManagers, + private void updateVirtualColumn(String table, String segment, List<UpdateLogEntry> messages, AtomicLong timeToStoreUpdateLogs) throws IOException { + Set<DataManagerCallback> dataManagers = _dataManagersHolder.getDataManagers(table, segment); LOGGER.debug("updating segment {} with {} results for {} data managers", segment, messages.size(), - segmentDataManagers.size()); - if (segmentDataManagers.size() > 0 || _retentionManager.getRetentionManagerForTable(table).shouldIngestForSegment(segment)) { + dataManagers.size()); + if (dataManagers.size() > 0 || _retentionManager.getRetentionManagerForTable(table).shouldIngestForSegment(segment)) { storeUpdateLogs(table, segment, messages, timeToStoreUpdateLogs); } try { - for (DataManagerCallback dataManager: segmentDataManagers) { + // refetch the data managers from holder in case there are updates + for (DataManagerCallback dataManager: _dataManagersHolder.getDataManagers(table, segment)) { dataManager.updateVirtualColumns(messages); } } catch (Exception ex) { - LOGGER.error("failed to update virtual column for key ", ex); + LOGGER.error("failed to update virtual column for key", ex); } } @@ -243,14 +242,11 @@ public class SegmentUpdater implements SegmentDeletionListener { DataManagerCallback dataManager) { // TODO get partition assignment from LOGGER.info("segment updater adding table {} segment {}", tableNameWithType, segmentName.getSegmentName()); - if (!_tableSegmentMap.containsKey(tableNameWithType)) { - synchronized (_tableSegmentMap) { - _tableSegmentMap.put(tableNameWithType, new ConcurrentHashMap<>()); - } + if (!_dataManagersHolder.hasTable(tableNameWithType)) { LOGGER.info("adding table {} to segment updater consumer", tableNameWithType); handleNewTableInServer(tableNameWithType); } - _tableSegmentMap.get(tableNameWithType).computeIfAbsent(segmentName.getSegmentName(), sn -> new HashSet<>()).add(dataManager); + _dataManagersHolder.addDataManager(tableNameWithType, segmentName.getSegmentName(), dataManager); synchronized (_tablePartitionCreationTime) { long creationTime = _tablePartitionCreationTime.computeIfAbsent(tableNameWithType, t -> new ConcurrentHashMap<>()) .computeIfAbsent(segmentName.getPartitionId(), p -> segmentName.getCreationTimeStamp()); @@ -262,16 +258,7 @@ public class SegmentUpdater implements SegmentDeletionListener { public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName, DataManagerCallback toDeleteManager) { LOGGER.info("segment updater removing table {} segment {}", tableNameWithType, segmentName); - Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableNameWithType); - if (segmentMap != null) { - Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName); - if (segmentDataManagers != null) { - segmentDataManagers.remove(toDeleteManager); - if (segmentDataManagers.size() == 0) { - segmentMap.remove(segmentName); - } - } - } + _dataManagersHolder.removeDataManager(tableNameWithType, segmentName, toDeleteManager); } /** @@ -300,22 +287,23 @@ public class SegmentUpdater implements SegmentDeletionListener { @Override public synchronized void onSegmentDeletion(String tableNameWithType, String segmentName) { LOGGER.info("deleting segment virtual column from local storage for table {} segment {}", tableNameWithType, segmentName); - Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType); - if (segmentManagerMap != null) { - if (segmentManagerMap.containsKey(segmentName) && segmentManagerMap.get(segmentName).size() > 0) { - LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size()); - } - try { - segmentManagerMap.remove(segmentName); - _retentionManager.getRetentionManagerForTable(tableNameWithType).notifySegmentDeletion(tableNameWithType); - _updateLogStorageProvider.removeSegment(tableNameWithType, segmentName); - } catch (IOException e) { - throw new RuntimeException(String.format("failed to delete table %s segment %s", tableNameWithType, segmentName), e); + if (_dataManagersHolder.hasTable(tableNameWithType)) { + boolean result = _dataManagersHolder.removeAllDataManagerForSegment(tableNameWithType, segmentName); + if (result) { + try { + UpdateLogTableRetentionManager retentionManager = _retentionManager.getRetentionManagerForTable(tableNameWithType); + if (retentionManager != null) { + retentionManager.notifySegmentDeletion(segmentName); + } + _updateLogStorageProvider.removeSegment(tableNameWithType, segmentName); + } catch (IOException e) { + throw new RuntimeException(String.format("failed to delete table %s segment %s", tableNameWithType, segmentName), e); + } } - if (segmentManagerMap.size() == 0) { - _tableSegmentMap.remove(tableNameWithType); + if (_dataManagersHolder.maybeRemoveTable(tableNameWithType)) { handleTableRemovalInServer(tableNameWithType); } + } else { LOGGER.error("deleting a segment {}:{} from current server but don't have segment map on updater", tableNameWithType, segmentName); diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java new file mode 100644 index 0000000..4f6bd50 --- /dev/null +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.updater; + +import com.google.common.collect.ImmutableSet; +import org.apache.pinot.core.data.manager.upsert.DataManagerCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * class to manage list of data managers and their associated table/segment for segment updater + */ +@ThreadSafe +public class SegmentUpdaterDataManagerHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUpdaterDataManagerHolder.class); + + private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>(); + + public SegmentUpdaterDataManagerHolder() {} + + /** + * fetch all tables containing at least one data manager on this server + * @return list of all pinot tables name for the current segment updater + */ + public Set<String> getAllTables() { + return ImmutableSet.copyOf(_tableSegmentMap.keySet()); + } + + /** + * check if there is any data manager associated with the given table + */ + public boolean hasTable(String tableName) { + return _tableSegmentMap.containsKey(tableName); + } + + /** + * get a set of data manager for the given table name and segment name + */ + public synchronized Set<DataManagerCallback> getDataManagers(String tableName, String segmentName) { + if (!_tableSegmentMap.containsKey(tableName)) { + LOGGER.error("try to fetch data manager for non-existing table {} segment {}", tableName, segmentName); + } else { + final Map<String, Set<DataManagerCallback>> segmentDataManagerMap = _tableSegmentMap.get(tableName); + if (segmentDataManagerMap.containsKey(segmentName)) { + return ImmutableSet.copyOf(segmentDataManagerMap.get(segmentName)); + } + } + return ImmutableSet.of(); + } + + /** + * add a data manager for a given table and segment name + */ + public synchronized void addDataManager(String tableName, String segmentName, DataManagerCallback dataManager) { + LOGGER.info("adding new data manager to updater for table {}, segment {}", tableName, segmentName); + if (!_tableSegmentMap.containsKey(tableName)) { + _tableSegmentMap.put(tableName, new ConcurrentHashMap<>()); + } + _tableSegmentMap.get(tableName).computeIfAbsent(segmentName, sn -> ConcurrentHashMap.newKeySet()).add(dataManager); + } + + /** + * remove a specific data manager for a given table and segment name. + * do nothing if there is no such data manager for the given table/segment name + */ + public synchronized void removeDataManager(String tableName, String segmentName, + DataManagerCallback toDeleteManager) { + Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableName); + if (segmentMap != null) { + Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName); + if (segmentDataManagers != null) { + segmentDataManagers.remove(toDeleteManager); + LOGGER.info("removing data manager for table {} segment {}", tableName, segmentName); + if (segmentDataManagers.size() == 0) { + segmentMap.remove(segmentName); + } + } + } + } + + /** + * remove all data managers for a table and segment + * @return true if we indeed remove any data manager, false otherwise + */ + public synchronized boolean removeAllDataManagerForSegment(String tableName, String segmentName) { + Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableName); + if (segmentManagerMap != null) { + if (segmentManagerMap.containsKey(segmentName)) { + LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size()); + } + Set<DataManagerCallback> result = segmentManagerMap.remove(segmentName); + return result != null; + } + return false; + } + + /** + * check if the table still has any associated data manager. If there is no data managers, then removed it from cached + * @return true if the given table is removed, false otherwise + */ + public synchronized boolean maybeRemoveTable(String tableName) { + Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableName); + if (segmentManagerMap != null && segmentManagerMap.size() == 0) { + _tableSegmentMap.remove(tableName); + return true; + } + return false; + } + +} diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java new file mode 100644 index 0000000..ca7aceb --- /dev/null +++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.updater; + +import com.google.common.collect.ImmutableSet; +import org.apache.pinot.core.data.manager.upsert.DataManagerCallback; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Set; + +import static org.mockito.Mockito.mock; + +public class SegmentUpdaterDataManagerHolderTest { + private SegmentUpdaterDataManagerHolder emptyHolder; + private SegmentUpdaterDataManagerHolder dataManagerHolder; + + private DataManagerCallback dummyManager1; + private DataManagerCallback dummyManager2; + private DataManagerCallback dummyManager3; + + @BeforeMethod + public void setUp() { + emptyHolder = new SegmentUpdaterDataManagerHolder(); + dataManagerHolder = new SegmentUpdaterDataManagerHolder(); + dummyManager1 = mock(DataManagerCallback.class); + dummyManager2 = mock(DataManagerCallback.class); + dummyManager3 = mock(DataManagerCallback.class); + dataManagerHolder.addDataManager("table", "segment1", dummyManager1); + dataManagerHolder.addDataManager("table", "segment2", dummyManager2); + dataManagerHolder.addDataManager("table2", "segment3", dummyManager3); + } + + @Test + public void testGetAllTables() { + Set<String> tables = dataManagerHolder.getAllTables(); + ensureSetEqual(tables, ImmutableSet.of("table", "table2")); + + Assert.assertEquals(emptyHolder.getAllTables().size(), 0); + } + + @Test + public void testHasTable() { + Assert.assertFalse(emptyHolder.hasTable("table")); + + Assert.assertTrue(dataManagerHolder.hasTable("table")); + Assert.assertTrue(dataManagerHolder.hasTable("table2")); + Assert.assertFalse(dataManagerHolder.hasTable("table3")); + } + + @Test + public void testGetDataManagers() { + Set<DataManagerCallback> dataManagers = dataManagerHolder.getDataManagers("table", "segment1"); + ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager1)); + + dataManagers = dataManagerHolder.getDataManagers("table", "segment2"); + ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager2)); + + dataManagers = dataManagerHolder.getDataManagers("table2", "segment3"); + ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager3)); + + // non exist tables/segments + dataManagers = dataManagerHolder.getDataManagers("table2", "segment1"); + ensureSetEqual(dataManagers, ImmutableSet.of()); + + dataManagers = dataManagerHolder.getDataManagers("table3", "segment1"); + ensureSetEqual(dataManagers, ImmutableSet.of()); + } + + @Test + public void testAddDataManager() { + DataManagerCallback dummyManager4 = mock(DataManagerCallback.class); + DataManagerCallback dummyManager5 = mock(DataManagerCallback.class); + DataManagerCallback dummyManager6 = mock(DataManagerCallback.class); + dataManagerHolder.addDataManager("table", "segment1", dummyManager4); + dataManagerHolder.addDataManager("table", "segment2", dummyManager5); + dataManagerHolder.addDataManager("table2", "segment1", dummyManager6); + + Set<DataManagerCallback> tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1"); + ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager1, dummyManager4)); + + tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment2"); + ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager2, dummyManager5)); + + tableSegmentDMs = dataManagerHolder.getDataManagers("table2", "segment1"); + ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager6)); + } + + @Test + public void testRemoveDataManager() { + DataManagerCallback dummyManager4 = mock(DataManagerCallback.class); + DataManagerCallback dummyManager5 = mock(DataManagerCallback.class); + DataManagerCallback dummyManager6 = mock(DataManagerCallback.class); + dataManagerHolder.addDataManager("table", "segment1", dummyManager4); + dataManagerHolder.addDataManager("table", "segment2", dummyManager5); + dataManagerHolder.addDataManager("table2", "segment1", dummyManager6); + Set<DataManagerCallback> tableSegmentDMs; + + // start deleting + dataManagerHolder.removeDataManager("table", "segment1", dummyManager1); + dataManagerHolder.removeDataManager("table", "segment1", dummyManager2); + dataManagerHolder.removeDataManager("table", "segment", dummyManager2); + tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1"); + ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager4)); + + // delete all segment + dataManagerHolder.removeDataManager("table", "segment1", dummyManager4); + tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1"); + ensureSetEqual(tableSegmentDMs, ImmutableSet.of()); + + // delete some more + dataManagerHolder.removeDataManager("table", "segment1", dummyManager4); + tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1"); + ensureSetEqual(tableSegmentDMs, ImmutableSet.of()); + + // add some back and delete + dataManagerHolder.addDataManager("table", "segment1", dummyManager4); + ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of(dummyManager4)); + + dataManagerHolder.removeDataManager("table", "segment1", dummyManager4); + ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of()); + } + + @Test + public void testRemoveAllDataManagerForSegment() { + DataManagerCallback dummyManager4 = mock(DataManagerCallback.class); + DataManagerCallback dummyManager5 = mock(DataManagerCallback.class); + DataManagerCallback dummyManager6 = mock(DataManagerCallback.class); + dataManagerHolder.addDataManager("table", "segment1", dummyManager4); + dataManagerHolder.addDataManager("table", "segment2", dummyManager5); + dataManagerHolder.addDataManager("table2", "segment1", dummyManager6); + + boolean result = dataManagerHolder.removeAllDataManagerForSegment("table", "segment1"); + ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of()); + Assert.assertTrue(result); + + result = dataManagerHolder.removeAllDataManagerForSegment("table", "segment1"); + ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of()); + Assert.assertFalse(result); + + result = dataManagerHolder.removeAllDataManagerForSegment("table3", "segment1"); + Assert.assertFalse(result); + } + + @Test + public void testMaybeRemoveTable() { + + Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table")); + Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table2")); + Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table4")); + + dataManagerHolder.removeAllDataManagerForSegment("table2", "segment3"); + Assert.assertTrue(dataManagerHolder.maybeRemoveTable("table2")); + + dataManagerHolder.removeAllDataManagerForSegment("table", "segment1"); + Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table")); + dataManagerHolder.removeAllDataManagerForSegment("table", "segment2"); + Assert.assertTrue(dataManagerHolder.maybeRemoveTable("table")); + } + + private <T> void ensureSetEqual(Set<T> set1, Set<T> set2) { + Assert.assertEquals(set2.size(), set1.size()); + for (T o: set1) { + Assert.assertTrue(set2.contains(o)); + } + } +} \ No newline at end of file diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java index 13a7612..eb1f3ea 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java @@ -26,19 +26,23 @@ import io.swagger.annotations.ApiResponses; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.data.manager.TableDataManager; -import org.apache.pinot.core.data.manager.UpsertSegmentDataManager; import org.apache.pinot.server.starter.ServerInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; @Api(tags = "UpsertDebug") @Path("/") public class UpsertDebugResource { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertDebugResource.class); @Inject ServerInstance serverInstance; @@ -64,6 +68,7 @@ public class UpsertDebugResource { if (tableDataManager == null) { return "no table for " + tableName; } + /* SegmentDataManager segmentDataManager = null; try { segmentDataManager = tableDataManager.acquireSegment(segmentName); @@ -73,12 +78,20 @@ public class UpsertDebugResource { if (!(segmentDataManager instanceof UpsertSegmentDataManager)) { return "it is not an upsert table"; } else { - return ((UpsertSegmentDataManager) segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr)); + long offset = Long.parseLong(offsetStr); + LOGGER.info("getting virtual column for table {} segment {} offset {}", tableName, segmentName, offset); + return ( segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr)); } + } catch (Exception ex) { + LOGGER.error("failed to fetch virtual column info", ex); + throw new WebApplicationException("Failed to fetch virtual column info" + ex.getMessage(), + Response.Status.INTERNAL_SERVER_ERROR); } finally { if (segmentDataManager != null) { tableDataManager.releaseSegment(segmentDataManager); } } + */ + return ""; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
