This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e31e6f9 refactor segment loading logic in BaseTableDataManager to
decouple it with local segment directory (#7969)
e31e6f9 is described below
commit e31e6f90cdf9b425c0584484aaac0db4a79b2c15
Author: Xiaobing <[email protected]>
AuthorDate: Thu Jan 13 18:13:34 2022 -0800
refactor segment loading logic in BaseTableDataManager to decouple it with
local segment directory (#7969)
* extend SegmentDirectoryLoader interface and refactor BaseTableDataManager
so that it
depends on SegmentDirectory interface instead of local seg dir to be more
extensible
* add license header
* address comments
* rebase and fix UT
* cr - refine comments, namings and logs, and refactor a few method
* refactor BaseTableDataMgr with SegmentDirectory.copyTo
* refine naming and comments
* close segdir before early return
* rebase master
* comments for clarity
* refine comments
* add more context in comments
Co-authored-by: Xiaobing Li <[email protected]>
---
.../core/data/manager/BaseTableDataManager.java | 262 ++++++----
.../data/manager/BaseTableDataManagerTest.java | 534 ++++++++++++++-------
.../local/data/manager/TableDataManager.java | 16 +
.../immutable/ImmutableSegmentLoader.java | 85 +++-
.../loader/DefaultSegmentDirectoryLoader.java | 6 +-
.../index/column/PhysicalColumnIndexContainer.java | 2 +-
.../segment/index/loader/SegmentPreProcessor.java | 34 +-
.../ColumnMinMaxValueGenerator.java | 2 +-
.../defaultcolumn/BaseDefaultColumnHandler.java | 2 +-
.../segment/store/SegmentLocalFSDirectory.java | 24 +
.../spi/index/metadata/SegmentMetadataImpl.java | 18 +-
.../pinot/segment/spi/store/SegmentDirectory.java | 9 +
.../starter/helix/HelixInstanceDataManager.java | 47 +-
.../helix/HelixInstanceDataManagerTest.java | 70 +++
14 files changed, 792 insertions(+), 319 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1a5c013..02aab34 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -36,6 +36,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
@@ -53,6 +54,10 @@ import
org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -285,49 +290,40 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
throws Exception {
- File indexDir = localMetadata.getIndexDir();
- Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is
not a directory", indexDir);
-
- File parentFile = indexDir.getParentFile();
- File segmentBackupDir =
- new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+ File indexDir = getSegmentDataDir(segmentName);
try {
- // First rename index directory to segment backup directory so that
original segment have all file descriptors
- // point to the segment backup directory to ensure original segment
serves queries properly
+ // Create backup directory to handle failure of segment reloading.
+ createBackup(indexDir);
- // Rename index directory to segment backup directory (atomic)
- Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
- "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
-
- // Download from remote or copy from local backup directory into index
directory,
- // and then continue to load the segment from index directory.
+ // Download segment from deep store if CRC changes or forced to download;
+ // otherwise, copy backup directory back to the original index directory.
+ // And then continue to load the segment from the index directory.
boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata,
localMetadata);
if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
if (forceDownload) {
LOGGER.info("Segment: {} of table: {} is forced to download",
segmentName, _tableNameWithType);
} else {
- LOGGER.info("Download segment:{} of table: {} as local crc: {}
mismatches remote crc: {}", segmentName,
+ LOGGER.info("Download segment:{} of table: {} as crc changes from:
{} to: {}", segmentName,
_tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(segmentName, zkMetadata);
} else {
- LOGGER.info("Reload the local copy of segment: {} of table: {}",
segmentName, _tableNameWithType);
- FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ LOGGER.info("Reload existing segment: {} of table: {}", segmentName,
_tableNameWithType);
+ // The indexDir is empty after calling createBackup, as it's renamed
to a backup directory.
+ // The SegmentDirectory should initialize accordingly. Like for
SegmentLocalFSDirectory, it
+ // doesn't load anything from an empty indexDir, but gets the info to
complete the copyTo.
+ try (SegmentDirectory segmentDirectory =
initSegmentDirectory(segmentName, indexLoadingConfig)) {
+ segmentDirectory.copyTo(indexDir);
+ }
}
- // Load from index directory and replace the old segment in memory.
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
-
- // Rename segment backup directory to segment temporary directory
(atomic)
- // The reason to first rename then delete is that, renaming is an atomic
operation, but deleting is not. When we
- // rename the segment backup directory to segment temporary directory,
we know the reload already succeeded, so
- // that we can safely delete the segment temporary directory
- File segmentTempDir = new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
- Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
- "Failed to rename segment backup directory: %s to segment temporary
directory: %s", segmentBackupDir,
- segmentTempDir);
- FileUtils.deleteDirectory(segmentTempDir);
+ // Load from indexDir and replace the old segment in memory. What's
inside indexDir
+ // may come from SegmentDirectory.copyTo() or the segment downloaded
from deep store.
+ ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig, schema);
+ addSegment(segment);
+
+ // Remove backup directory to mark the completion of segment reloading.
+ removeBackup(indexDir);
} catch (Exception reloadFailureException) {
try {
LoaderUtils.reloadFailureRecovery(indexDir);
@@ -343,26 +339,19 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
throws Exception {
- if (!isNewSegment(zkMetadata, localMetadata)) {
+ if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) {
LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
_tableNameWithType, localMetadata.getCrc());
return;
}
- // Try to recover if no local metadata is provided.
- if (localMetadata == null) {
- LOGGER.info("Segment: {} of table: {} is not loaded, checking disk",
segmentName, _tableNameWithType);
- localMetadata = recoverSegmentQuietly(segmentName);
- if (!isNewSegment(zkMetadata, localMetadata)) {
- LOGGER.info("Segment: {} of table {} has crc: {} same as before,
loading", segmentName, _tableNameWithType,
- localMetadata.getCrc());
- if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
- return;
- }
- // Set local metadata to null to indicate that the local segment fails
to load,
- // although it exists and has same crc with the remote one.
- localMetadata = null;
- }
+ // The segment is not loaded by the server if the metadata object is null.
But the segment
+ // may still be kept on the server. For example when server gets
restarted, the segment is
+ // still on the server but the metadata object has not been initialized
yet. In this case,
+ // we should check if the segment exists on server and try to load it. If
the segment does
+ // not exist or fails to get loaded, we download segment from deep store
to load it again.
+ if (localMetadata == null && tryLoadExistingSegment(segmentName,
indexLoadingConfig, zkMetadata)) {
+ return;
}
Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment:
%s of table: %s does not allow download",
@@ -371,9 +360,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// Download segment and replace the local one, either due to failure to
recover local segment,
// or the segment data is updated and has new CRC now.
if (localMetadata == null) {
- LOGGER.info("Download segment: {} of table: {} as no good one exists
locally", segmentName, _tableNameWithType);
+ LOGGER.info("Download segment: {} of table: {} as it doesn't exist",
segmentName, _tableNameWithType);
} else {
- LOGGER.info("Download segment: {} of table: {} as local crc: {}
mismatches remote crc: {}.", segmentName,
+ LOGGER.info("Download segment: {} of table: {} as crc changes from: {}
to: {}", segmentName,
_tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
}
File indexDir = downloadSegment(segmentName, zkMetadata);
@@ -392,43 +381,6 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return downloadSegmentFromDeepStore(segmentName, zkMetadata);
}
- /**
- * Server restart during segment reload might leave segment directory in
inconsistent state, like the index
- * directory might not exist but segment backup directory existed. This
method tries to recover from reload
- * failure before checking the existence of the index directory and loading
segment metadata from it.
- */
- private SegmentMetadata recoverSegmentQuietly(String segmentName) {
- File indexDir = getSegmentDataDir(segmentName);
- try {
- LoaderUtils.reloadFailureRecovery(indexDir);
- if (!indexDir.exists()) {
- LOGGER.info("Segment: {} of table: {} is not found on disk",
segmentName, _tableNameWithType);
- return null;
- }
- SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
- LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready
for loading", segmentName,
- _tableNameWithType, localMetadata.getCrc());
- return localMetadata;
- } catch (Exception e) {
- LOGGER.error("Failed to recover segment: {} of table: {} from disk",
segmentName, _tableNameWithType, e);
- FileUtils.deleteQuietly(indexDir);
- return null;
- }
- }
-
- private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
- File indexDir = getSegmentDataDir(segmentName);
- try {
- addSegment(indexDir, indexLoadingConfig);
- LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName,
_tableNameWithType);
- return true;
- } catch (Exception e) {
- FileUtils.deleteQuietly(indexDir);
- LOGGER.error("Failed to load segment: {} of table: {} from disk",
segmentName, _tableNameWithType, e);
- return false;
- }
- }
-
private File downloadSegmentFromDeepStore(String segmentName,
SegmentZKMetadata zkMetadata)
throws Exception {
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
@@ -493,12 +445,148 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return new File(_resourceTmpDir, segmentName);
}
- @VisibleForTesting
- static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable
SegmentMetadata localMetadata) {
- return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+ /**
+ * Create a backup directory to handle failure of segment reloading.
+ * First rename index directory to segment backup directory so that original
segment have all file
+ * descriptors point to the segment backup directory to ensure original
segment serves queries properly.
+ * The original index directory is restored lazily, as depending on the
conditions,
+ * it may be restored from the backup directory or segment downloaded from
deep store.
+ */
+ private void createBackup(File indexDir) {
+ if (!indexDir.exists()) {
+ return;
+ }
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ // Rename index directory to segment backup directory (atomic).
+ Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+ "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+ }
+
+ /**
+ * Remove the backup directory to mark the completion of segment reloading.
+ * First rename then delete is as renaming is an atomic operation, but
deleting is not.
+ * When we rename the segment backup directory to segment temporary
directory, we know the reload
+ * already succeeded, so that we can safely delete the segment temporary
directory.
+ */
+ private void removeBackup(File indexDir)
+ throws IOException {
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ if (!segmentBackupDir.exists()) {
+ return;
+ }
+ // Rename segment backup directory to segment temporary directory (atomic).
+ File segmentTempDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+ Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+ "Failed to rename segment backup directory: %s to segment temporary
directory: %s", segmentBackupDir,
+ segmentTempDir);
+ FileUtils.deleteDirectory(segmentTempDir);
+ }
+
+ /**
+ * Try to load the segment potentially still existing on the server.
+ *
+ * @return true if the segment still exists on server, its CRC is still same
with the
+ * one in SegmentZKMetadata and is loaded into memory successfully; false if
it doesn't
+ * exist on the server, its CRC has changed, or it fails to be loaded.
SegmentDirectory
+ * object may be created when trying to load the segment, but it's closed if
the method
+ * returns false; otherwise it's opened and to be referred by
ImmutableSegment object.
+ */
+ private boolean tryLoadExistingSegment(String segmentName,
IndexLoadingConfig indexLoadingConfig,
+ SegmentZKMetadata zkMetadata) {
+ // Try to recover the segment from potential segment reloading failure.
+ File indexDir = getSegmentDataDir(segmentName);
+ recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+ // Creates the SegmentDirectory object to access the segment metadata.
+ // The metadata is null if the segment doesn't exist yet.
+ SegmentDirectory segmentDirectory = tryInitSegmentDirectory(segmentName,
indexLoadingConfig);
+ SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null :
segmentDirectory.getSegmentMetadata();
+
+ // If the segment doesn't exist on server or its CRC has changed, then we
+ // need to fall back to download the segment from deep store to load it.
+ if (segmentMetadata == null || !hasSameCRC(zkMetadata, segmentMetadata)) {
+ if (segmentMetadata == null) {
+ LOGGER.info("Segment: {} of table: {} does not exist", segmentName,
_tableNameWithType);
+ } else if (!hasSameCRC(zkMetadata, segmentMetadata)) {
+ LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}",
segmentName, _tableNameWithType,
+ segmentMetadata.getCrc(), zkMetadata.getCrc());
+ }
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ return false;
+ }
+
+ try {
+ // If the segment is still kept by the server, then we can
+ // either load it directly if it's still consistent with latest table
config and schema;
+ // or reprocess it to reflect latest table config and schema before
loading.
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
+ if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory,
indexLoadingConfig, schema)) {
+ LOGGER.info("Segment: {} of table: {} is consistent with latest table
config and schema", segmentName,
+ _tableNameWithType);
+ } else {
+ LOGGER.info("Segment: {} of table: {} needs reprocess to reflect
latest table config and schema", segmentName,
+ _tableNameWithType);
+ segmentDirectory.copyTo(indexDir);
+ // Close the stale SegmentDirectory object and recreate it with
reprocessed segment.
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig,
schema);
+ segmentDirectory = initSegmentDirectory(segmentName,
indexLoadingConfig);
+ }
+ ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig, schema);
+ addSegment(segment);
+ LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
+ zkMetadata.getCrc());
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failed to load existing segment: {} of table: {} with crc:
{}", segmentName, _tableNameWithType, e);
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ return false;
+ }
+ }
+
+ private SegmentDirectory tryInitSegmentDirectory(String segmentName,
IndexLoadingConfig indexLoadingConfig) {
+ try {
+ return initSegmentDirectory(segmentName, indexLoadingConfig);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to initialize SegmentDirectory for segment: {} of
table: {} with error: {}", segmentName,
+ _tableNameWithType, e.getMessage());
+ return null;
+ }
+ }
+
+ private SegmentDirectory initSegmentDirectory(String segmentName,
IndexLoadingConfig indexLoadingConfig)
+ throws Exception {
+ SegmentDirectoryLoaderContext loaderContext =
+ new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(),
indexLoadingConfig.getInstanceId(),
+ segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+ SegmentDirectoryLoader segmentDirectoryLoader =
+
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+ File indexDir = getSegmentDataDir(segmentName);
+ return segmentDirectoryLoader.load(indexDir.toURI(), loaderContext);
}
private static boolean hasSameCRC(SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata) {
return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc());
}
+
+ private static void recoverReloadFailureQuietly(String tableNameWithType,
String segmentName, File indexDir) {
+ try {
+ LoaderUtils.reloadFailureRecovery(indexDir);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to recover segment: {} of table: {} due to error:
{}", segmentName, tableNameWithType,
+ e.getMessage());
+ }
+ }
+
+ private static void closeSegmentDirectoryQuietly(SegmentDirectory
segmentDirectory) {
+ if (segmentDirectory != null) {
+ try {
+ segmentDirectory.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close SegmentDirectory due to error: {}",
e.getMessage());
+ }
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 78af028..fac8977 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -18,9 +18,15 @@
*/
package org.apache.pinot.core.data.manager;
+import java.io.DataInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
@@ -32,14 +38,25 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterMethod;
@@ -58,9 +75,13 @@ import static org.testng.Assert.fail;
public class BaseTableDataManagerTest {
- private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"OfflineTableDataManagerTest");
-
- private static final String TABLE_NAME = "__table01__";
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"BaseTableDataManagerTest");
+ private static final String TABLE_NAME = "table01";
+ private static final File TABLE_DATA_DIR = new File(TEMP_DIR, TABLE_NAME);
+ private static final String STRING_COLUMN = "col1";
+ private static final String[] STRING_VALUES = {"A", "D", "E", "B", "C"};
+ private static final String LONG_COLUMN = "col2";
+ private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L,
30000L};
@BeforeMethod
public void setUp()
@@ -75,210 +96,328 @@ public class BaseTableDataManagerTest {
FileUtils.deleteDirectory(TEMP_DIR);
}
- private BaseTableDataManager makeTestableManager() {
- TableDataManagerConfig config = mock(TableDataManagerConfig.class);
- when(config.getTableName()).thenReturn(TABLE_NAME);
- when(config.getDataDir()).thenReturn(new File(TEMP_DIR,
TABLE_NAME).getAbsolutePath());
+ @Test
+ public void testReloadSegmentNewData()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName,
SegmentVersion.v3, 5);
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null);
- tableDataManager.start();
- return tableDataManager;
+ // Mock the case where segment is loaded but its CRC is different from
+ // the one in zk, thus raw segment is downloaded and loaded.
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("0");
+
+ BaseTableDataManager tmgr = createTableManager();
+ assertFalse(tmgr.getSegmentDataDir(segName).exists());
+ tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null,
false);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
}
@Test
- public void testReloadSegmentNewData()
+ public void testReloadSegmentUseLocalCopy()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
- File tempRootDir = tmgr.getTmpSegmentDataDir("test-new-data");
-
- // Create an empty segment and compress it to tar.gz as the one in deep
store.
- // All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- File tempInputDir = new File(tempRootDir, "seg01_input");
- FileUtils
- .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remove");
- TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
- FileUtils.deleteQuietly(tempInputDir);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+ // Same CRCs so load the local segment directory directly.
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
- when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-
- File indexDir = tmgr.getSegmentDataDir("seg01");
- FileUtils.write(new File(indexDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
-
- // Different CRCs leading to segment download.
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
SegmentMetadata llmd = mock(SegmentMetadata.class);
- when(llmd.getCrc()).thenReturn("10240");
- when(llmd.getIndexDir()).thenReturn(indexDir);
+ when(llmd.getCrc()).thenReturn(segCrc);
+
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null,
false);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
- tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd,
null, false);
- assertTrue(tmgr.getSegmentDataDir("seg01").exists());
- assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
- .contains("k=remove"));
+ FileUtils.deleteQuietly(localSegDir);
+ try {
+ tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd,
null, false);
+ fail();
+ } catch (Exception e) {
+ // As expected, segment reloading fails due to missing the local segment
dir.
+ assertTrue(e.getMessage().contains("does not exist or is not a
directory"));
+ }
}
@Test
- public void testReloadSegmentLocalCopy()
+ public void testReloadSegmentConvertVersion()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
- File tempRootDir = tmgr.getTmpSegmentDataDir("test-local-copy");
-
- // Create an empty segment and compress it to tar.gz as the one in deep
store.
- // All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- File tempInputDir = new File(tempRootDir, "seg01_input");
- FileUtils
- .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
- TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
- FileUtils.deleteQuietly(tempInputDir);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+ // Same CRCs so load the local segment directory directly.
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
- when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-
- File indexDir = tmgr.getSegmentDataDir("seg01");
- FileUtils.write(new File(indexDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
-
- // Same CRCs so load the local copy.
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
SegmentMetadata llmd = mock(SegmentMetadata.class);
- when(llmd.getCrc()).thenReturn("1024");
- when(llmd.getIndexDir()).thenReturn(indexDir);
-
- tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd,
null, false);
- assertTrue(tmgr.getSegmentDataDir("seg01").exists());
- assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
- .contains("k=local"));
+ when(llmd.getCrc()).thenReturn(segCrc);
+
+ // Require to use v3 format.
+ IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+ idxCfg.setSegmentVersion(SegmentVersion.v3);
+
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.reloadSegment(segName, idxCfg, zkmd, llmd, null, false);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getVersion(), SegmentVersion.v3);
+ assertEquals(llmd.getTotalDocs(), 5);
}
@Test
- public void testReloadSegmentForceDownload()
+ public void testReloadSegmentAddIndex()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
- File tempRootDir = tmgr.getTmpSegmentDataDir("test-force-download");
-
- // Create an empty segment and compress it to tar.gz as the one in deep
store.
- // All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- File tempInputDir = new File(tempRootDir, "seg01_input");
- FileUtils
- .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
- TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
- FileUtils.deleteQuietly(tempInputDir);
-
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+ assertFalse(hasInvertedIndex(localSegDir, STRING_COLUMN,
SegmentVersion.v3));
+ assertFalse(hasInvertedIndex(localSegDir, LONG_COLUMN, SegmentVersion.v3));
+
+ // Same CRCs so load the local segment directory directly.
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
- when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn(segCrc);
+
+ // Require to add indices.
+ IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+ idxCfg.setSegmentVersion(SegmentVersion.v3);
+ idxCfg.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN,
LONG_COLUMN)));
+
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.reloadSegment(segName, idxCfg, zkmd, llmd, null, false);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
+ assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName),
STRING_COLUMN, SegmentVersion.v3));
+ assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName), LONG_COLUMN,
SegmentVersion.v3));
+ }
- File indexDir = tmgr.getSegmentDataDir("seg01");
- FileUtils.write(new File(indexDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+ @Test
+ public void testReloadSegmentForceDownload()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName,
SegmentVersion.v3, 5);
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
5);
+
+ // Same CRC but force to download.
+ BaseTableDataManager tmgr = createTableManager();
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getCrc(), zkmd.getCrc() + "");
+
+ // Remove the local segment dir. Segment reloading fails unless force to
download.
+ FileUtils.deleteQuietly(localSegDir);
+ try {
+ tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd,
null, false);
+ fail();
+ } catch (Exception e) {
+ // As expected, segment reloading fails due to missing the local segment
dir.
+ assertTrue(e.getMessage().contains("does not exist or is not a
directory"));
+ }
- // Same CRC but force to download
- SegmentMetadata llmd = mock(SegmentMetadata.class);
- when(llmd.getCrc()).thenReturn("1024");
- when(llmd.getIndexDir()).thenReturn(indexDir);
+ tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null,
true);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
- tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd,
null, true);
- assertTrue(tmgr.getSegmentDataDir("seg01").exists());
- assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
- .contains("k=remote"));
+ llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getCrc(), zkmd.getCrc() + "");
+ assertEquals(llmd.getTotalDocs(), 5);
}
@Test
public void testAddOrReplaceSegmentNewData()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
- File tempRootDir = tmgr.getTmpSegmentDataDir("test-new-data");
-
- // Create an empty segment and compress it to tar.gz as the one in deep
store.
- // All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- File tempInputDir = new File(tempRootDir, "seg01_input");
- FileUtils.write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01");
- TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
- FileUtils.deleteQuietly(tempInputDir);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName,
SegmentVersion.v3, 5);
- SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
- when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-
- // Different CRCs leading to segment download.
+ // Mock the case where segment is loaded but its CRC is different from
+ // the one in zk, thus raw segment is downloaded and loaded.
SegmentMetadata llmd = mock(SegmentMetadata.class);
- when(llmd.getCrc()).thenReturn("10240");
-
- assertFalse(tmgr.getSegmentDataDir("seg01").exists());
- tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
llmd);
- assertTrue(tmgr.getSegmentDataDir("seg01").exists());
- assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
- .contains("docs=0"));
+ when(llmd.getCrc()).thenReturn("0");
+
+ BaseTableDataManager tmgr = createTableManager();
+ assertFalse(tmgr.getSegmentDataDir(segName).exists());
+ tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, llmd);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
}
@Test
public void testAddOrReplaceSegmentNoop()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
-
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
SegmentMetadata llmd = mock(SegmentMetadata.class);
when(llmd.getCrc()).thenReturn("1024");
+ BaseTableDataManager tmgr = createTableManager();
assertFalse(tmgr.getSegmentDataDir("seg01").exists());
- tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
llmd);
+ tmgr.addOrReplaceSegment("seg01", createIndexLoadingConfig(), zkmd, llmd);
// As CRC is same, the index dir is left as is, so not get created by the
test.
assertFalse(tmgr.getSegmentDataDir("seg01").exists());
}
@Test
- public void testAddOrReplaceSegmentRecovered()
+ public void testAddOrReplaceSegmentUseLocalCopy()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+ // Make local and remote CRC same to skip downloading raw segment.
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- // Make this equal to the default crc value, so no need to make a dummy
creation.meta file.
- when(zkmd.getCrc()).thenReturn(Long.MIN_VALUE);
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+ when(zkmd.getDownloadUrl()).thenReturn("file://somewhere");
- File backup = tmgr.getSegmentDataDir("seg01" +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
- FileUtils.write(new File(backup, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01");
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
- assertFalse(tmgr.getSegmentDataDir("seg01").exists());
- tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
null);
- assertTrue(tmgr.getSegmentDataDir("seg01").exists());
- assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
- .contains("docs=0"));
+ FileUtils.deleteQuietly(localSegDir);
+ try {
+ tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd,
null);
+ fail();
+ } catch (Exception e) {
+ // As expected, when local segment dir is missing, it tries to download
+ // raw segment from deep store, but it would fail with bad download uri.
+ assertEquals(e.getMessage(), "Operation failed after 3 attempts");
+ }
}
@Test
- public void testAddOrReplaceSegmentNotRecovered()
+ public void testAddOrReplaceSegmentUseBackupCopy()
throws Exception {
- BaseTableDataManager tmgr = makeTestableManager();
- File tempRootDir = tmgr.getTmpSegmentDataDir("test-force-download");
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v3);
- // Create an empty segment and compress it to tar.gz as the one in deep
store.
- // All input and intermediate files are put in the tempRootDir.
- File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- File tempInputDir = new File(tempRootDir, "seg01_input");
- FileUtils
- .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
- TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
- FileUtils.deleteQuietly(tempInputDir);
+ // Make local and remote CRC same to skip downloading raw segment.
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+ BaseTableDataManager tmgr = createTableManager();
+ File backup = tmgr.getSegmentDataDir(segName +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ localSegDir.renameTo(backup);
+ assertFalse(tmgr.getSegmentDataDir(segName).exists());
+ tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
+ }
+
+ @Test
+ public void testAddOrReplaceSegmentStaleBackupCopy()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName,
SegmentVersion.v3, 5);
+
+ BaseTableDataManager tmgr = createTableManager();
+ // Create a local segment with fewer rows, making its CRC different from
the raw segment.
+ // So that the raw segment is downloaded and loaded in the end.
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
3);
+ File backup = tmgr.getSegmentDataDir(segName +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ localSegDir.renameTo(backup);
+
+ assertFalse(tmgr.getSegmentDataDir(segName).exists());
+ tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
+ }
+
+ @Test
+ public void testAddOrReplaceSegmentUpConvertVersion()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+
+ // Make local and remote CRC same to skip downloading raw segment.
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
- when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+ // Require to use v3 format.
+ IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+ idxCfg.setSegmentVersion(SegmentVersion.v3);
+
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.addOrReplaceSegment(segName, idxCfg, zkmd, null);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getVersion(), SegmentVersion.v3);
+ assertEquals(llmd.getTotalDocs(), 5);
+ }
- // Though can recover from backup, but CRC is different. Local CRC is
Long.MIN_VALUE.
- File backup = tmgr.getSegmentDataDir("seg01" +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
- FileUtils.write(new File(backup, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+ @Test
+ public void testAddOrReplaceSegmentDownConvertVersion()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v3);
- assertFalse(tmgr.getSegmentDataDir("seg01").exists());
- tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
null);
- assertTrue(tmgr.getSegmentDataDir("seg01").exists());
- assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
- .contains("k=remote"));
+ // Make local and remote CRC same to skip downloading raw segment.
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+ // Require to use v1 format.
+ IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+ idxCfg.setSegmentVersion(SegmentVersion.v1);
+
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.addOrReplaceSegment(segName, idxCfg, zkmd, null);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ // The existing segment preprocessing logic doesn't down convert segment
format.
+ assertEquals(llmd.getVersion(), SegmentVersion.v3);
+ assertEquals(llmd.getTotalDocs(), 5);
+ }
+
+ @Test
+ public void testAddOrReplaceSegmentAddIndex()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ String segName = "seg01";
+ File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3,
5);
+ String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+ assertFalse(hasInvertedIndex(localSegDir, STRING_COLUMN,
SegmentVersion.v3));
+ assertFalse(hasInvertedIndex(localSegDir, LONG_COLUMN, SegmentVersion.v3));
+
+ // Make local and remote CRC same to skip downloading raw segment.
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+ // Require to add indices.
+ IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+ idxCfg.setSegmentVersion(SegmentVersion.v3);
+ idxCfg.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN,
LONG_COLUMN)));
+
+ BaseTableDataManager tmgr = createTableManager();
+ tmgr.addOrReplaceSegment(segName, idxCfg, zkmd, null);
+ assertTrue(tmgr.getSegmentDataDir(segName).exists());
+ SegmentMetadataImpl llmd = new
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+ assertEquals(llmd.getTotalDocs(), 5);
+ assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName),
STRING_COLUMN, SegmentVersion.v3));
+ assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName), LONG_COLUMN,
SegmentVersion.v3));
}
@Test
@@ -290,7 +429,7 @@ public class BaseTableDataManagerTest {
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempInput.getAbsolutePath());
- BaseTableDataManager tmgr = makeTestableManager();
+ BaseTableDataManager tmgr = createTableManager();
File tempRootDir = tmgr.getTmpSegmentDataDir("test-download-decrypt");
File tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
@@ -301,9 +440,9 @@ public class BaseTableDataManagerTest {
assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere
remote");
FakePinotCrypter fakeCrypter = (FakePinotCrypter)
PinotCrypterFactory.create("fakePinotCrypter");
- assertTrue(
-
fakeCrypter._origFile.getAbsolutePath().endsWith("__table01__/tmp/test-download-decrypt/seg01.tar.gz.enc"));
-
assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith("__table01__/tmp/test-download-decrypt/seg01.tar.gz"));
+ String parentDir = TABLE_NAME + "/tmp/test-download-decrypt/";
+ assertTrue(fakeCrypter._origFile.getAbsolutePath().endsWith(parentDir +
"seg01.tar.gz.enc"));
+ assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith(parentDir +
"seg01.tar.gz"));
try {
// Set maxRetry to 0 to cause retry failure immediately.
@@ -320,7 +459,7 @@ public class BaseTableDataManagerTest {
@Test
public void testUntarAndMoveSegment()
throws IOException {
- BaseTableDataManager tmgr = makeTestableManager();
+ BaseTableDataManager tmgr = createTableManager();
File tempRootDir = tmgr.getTmpSegmentDataDir("test-untar-move");
// All input and intermediate files are put in the tempRootDir.
@@ -343,22 +482,6 @@ public class BaseTableDataManagerTest {
}
}
- @Test
- public void testIsNewSegmentMetadata()
- throws IOException {
- SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
- when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
- assertTrue(BaseTableDataManager.isNewSegment(zkmd, null));
-
- SegmentMetadata llmd = mock(SegmentMetadata.class);
- when(llmd.getCrc()).thenReturn("1024");
- assertFalse(BaseTableDataManager.isNewSegment(zkmd, llmd));
-
- llmd = mock(SegmentMetadata.class);
- when(llmd.getCrc()).thenReturn("10245");
- assertTrue(BaseTableDataManager.isNewSegment(zkmd, llmd));
- }
-
// Has to be public class for the class loader to work.
public static class FakePinotCrypter implements PinotCrypter {
private File _origFile;
@@ -392,10 +515,81 @@ public class BaseTableDataManagerTest {
PinotCrypterFactory.init(new PinotConfiguration(properties));
}
- private static IndexLoadingConfig newDummyIndexLoadingConfig() {
- IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
- when(indexLoadingConfig.getReadMode()).thenReturn(ReadMode.mmap);
- when(indexLoadingConfig.getSegmentVersion()).thenReturn(SegmentVersion.v3);
+ private static IndexLoadingConfig createIndexLoadingConfig() {
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+ indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+ indexLoadingConfig.setReadMode(ReadMode.mmap);
return indexLoadingConfig;
}
+
+ private static BaseTableDataManager createTableManager() {
+ TableDataManagerConfig config = mock(TableDataManagerConfig.class);
+ when(config.getTableName()).thenReturn(TABLE_NAME);
+ when(config.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
+
+ OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
+ tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null);
+ tableDataManager.start();
+ return tableDataManager;
+ }
+
+ private static SegmentZKMetadata createRawSegment(TableConfig tableConfig,
String segName, SegmentVersion segVer,
+ int rowCnt)
+ throws Exception {
+ File segDir = createSegment(tableConfig, segName, segVer, rowCnt);
+ String segCrc = getCRC(segDir, SegmentVersion.v3);
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ File tempTar = new File(TEMP_DIR, segName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarGzCompressionUtils.createTarGzFile(segDir, tempTar);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+ FileUtils.deleteQuietly(segDir);
+ return zkmd;
+ }
+
+ private static File createSegment(TableConfig tableConfig, String segName,
SegmentVersion segVer, int rowCnt)
+ throws Exception {
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN,
FieldSpec.DataType.STRING)
+ .addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build();
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
+ config.setSegmentName(segName);
+ config.setSegmentVersion(segVer);
+ List<GenericRow> rows = new ArrayList<>(3);
+ for (int i = 0; i < rowCnt; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(STRING_COLUMN, STRING_VALUES[i]);
+ row.putValue(LONG_COLUMN, LONG_VALUES[i]);
+ rows.add(row);
+ }
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(rows));
+ driver.build();
+ return new File(TABLE_DATA_DIR, segName);
+ }
+
+ private static String getCRC(File segDir, SegmentVersion segVer)
+ throws IOException {
+ File parentDir = segDir;
+ if (segVer == SegmentVersion.v3) {
+ parentDir = new File(segDir, "v3");
+ }
+ File crcFile = new File(parentDir, V1Constants.SEGMENT_CREATION_META);
+ try (DataInputStream ds = new DataInputStream(new
FileInputStream(crcFile))) {
+ return String.valueOf(ds.readLong());
+ }
+ }
+
+ private static boolean hasInvertedIndex(File segDir, String colName,
SegmentVersion segVer)
+ throws IOException {
+ File parentDir = segDir;
+ if (segVer == SegmentVersion.v3) {
+ parentDir = new File(segDir, "v3");
+ }
+ File idxMapFile = new File(parentDir, V1Constants.INDEX_MAP_FILE_NAME);
+ return FileUtils.readFileToString(idxMapFile).contains(colName +
".inverted_index");
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 22853ed..3ae95a6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -86,6 +86,15 @@ public interface TableDataManager {
* A new segment may be downloaded if the local one has a different CRC; or
can be forced to download
* if forceDownload flag is true. This operation is conducted within a
failure handling framework
* and made transparent to ongoing queries, because the segment is in online
serving state.
+ *
+ * @param segmentName the segment to reload
+ * @param indexLoadingConfig the latest table config to load segment
+ * @param zkMetadata the segment metadata from zookeeper
+ * @param localMetadata the segment metadata object held by server right now,
+ * which must not be null to reload the segment
+ * @param schema the latest table schema to load segment
+ * @param forceDownload whether to force to download raw segment to reload
+ * @throws Exception thrown upon failure when to reload the segment
*/
void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
@@ -97,6 +106,13 @@ public interface TableDataManager {
* This operation is conducted outside the failure handling framework as
used in segment reloading,
* because the segment is not yet online serving queries, e.g. this method
is used to add a new segment,
* or transition a segment to online serving state.
+ *
+ * @param segmentName the segment to add or replace
+ * @param indexLoadingConfig the latest table config to load segment
+ * @param zkMetadata the segment metadata from zookeeper
+ * @param localMetadata the segment metadata object held by server, which
can be null when
+ * the server is restarted or the segment is newly
added to the table
+ * @throws Exception thrown upon failure when to add or replace the segment
*/
void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
@Nullable SegmentMetadata localMetadata)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 164d86a..3ad16ee 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.indexsegment.immutable;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -95,35 +96,59 @@ public class ImmutableSegmentLoader {
* modify the segment like to convert segment format, add or remove indices.
*/
public static ImmutableSegment load(File indexDir, IndexLoadingConfig
indexLoadingConfig, @Nullable Schema schema,
- boolean shouldModifySegment)
+ boolean needPreprocess)
throws Exception {
Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s
does not exist or is not a directory",
indexDir);
- SegmentMetadataImpl localSegmentMetadata = new
SegmentMetadataImpl(indexDir);
- if (localSegmentMetadata.getTotalDocs() == 0) {
- return new EmptyIndexSegment(localSegmentMetadata);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ if (segmentMetadata.getTotalDocs() == 0) {
+ return new EmptyIndexSegment(segmentMetadata);
}
-
- // This step will modify the segment data on disk.
- if (shouldModifySegment) {
- // Convert segment version as needed.
- convertSegmentFormat(indexDir, indexLoadingConfig, localSegmentMetadata);
- // Preprocess the segment on local using local SegmentDirectory.
- preprocessSegment(indexDir, localSegmentMetadata.getName(),
indexLoadingConfig, schema);
+ if (needPreprocess) {
+ preprocess(indexDir, indexLoadingConfig, schema);
}
-
- // Load the segment again using the configured segmentDirectoryLoader
- PinotConfiguration segmentDirectoryConfigs =
indexLoadingConfig.getSegmentDirectoryConfigs();
+ String segmentName = segmentMetadata.getName();
SegmentDirectoryLoaderContext segmentLoaderContext =
new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(),
indexLoadingConfig.getInstanceId(),
- localSegmentMetadata.getName(), segmentDirectoryConfigs);
-
- SegmentDirectoryLoader segmentDirectoryLoader =
+ segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+ SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
- SegmentDirectory actualSegmentDirectory =
segmentDirectoryLoader.load(indexDir.toURI(), segmentLoaderContext);
- SegmentDirectory.Reader segmentReader =
actualSegmentDirectory.createReader();
- SegmentMetadataImpl segmentMetadata =
actualSegmentDirectory.getSegmentMetadata();
+ SegmentDirectory segmentDirectory = segmentLoader.load(indexDir.toURI(),
segmentLoaderContext);
+ try {
+ return load(segmentDirectory, indexLoadingConfig, schema);
+ } catch (Exception e) {
+ LOGGER.error("Failed to load segment: {} with SegmentDirectory",
segmentName, e);
+ segmentDirectory.close();
+ throw e;
+ }
+ }
+
+ /**
+ * Preprocess the local segment directory according to the current table
config and schema.
+ */
+ public static void preprocess(File indexDir, IndexLoadingConfig
indexLoadingConfig, @Nullable Schema schema)
+ throws Exception {
+ Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s
does not exist or is not a directory",
+ indexDir);
+
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ if (segmentMetadata.getTotalDocs() > 0) {
+ convertSegmentFormat(indexDir, indexLoadingConfig, segmentMetadata);
+ preprocessSegment(indexDir, segmentMetadata.getName(),
indexLoadingConfig, schema);
+ }
+ }
+
+ /**
+ * Load the segment represented by the SegmentDirectory object to serve
queries.
+ */
+ public static ImmutableSegment load(SegmentDirectory segmentDirectory,
IndexLoadingConfig indexLoadingConfig,
+ @Nullable Schema schema)
+ throws Exception {
+ SegmentMetadataImpl segmentMetadata =
segmentDirectory.getSegmentMetadata();
+ if (segmentMetadata.getTotalDocs() == 0) {
+ return new EmptyIndexSegment(segmentMetadata);
+ }
// Remove columns not in schema from the metadata
Map<String, ColumnMetadata> columnMetadataMap =
segmentMetadata.getColumnMetadataMap();
@@ -138,16 +163,24 @@ public class ImmutableSegmentLoader {
}
}
+ URI indexDirURI = segmentDirectory.getIndexDir();
+ String scheme = indexDirURI.getScheme();
+ File localIndexDir = null;
+ if (scheme != null && scheme.equalsIgnoreCase("file")) {
+ localIndexDir = new File(indexDirURI);
+ }
+
+ SegmentDirectory.Reader segmentReader = segmentDirectory.createReader();
Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
for (Map.Entry<String, ColumnMetadata> entry :
columnMetadataMap.entrySet()) {
// FIXME: text-index only works with local SegmentDirectory
indexContainerMap.put(entry.getKey(),
- new PhysicalColumnIndexContainer(segmentReader, entry.getValue(),
indexLoadingConfig, indexDir,
+ new PhysicalColumnIndexContainer(segmentReader, entry.getValue(),
indexLoadingConfig, localIndexDir,
IndexingOverrides.getIndexReaderProvider()));
}
// Instantiate virtual columns
- String segmentName = indexDir.getName();
+ String segmentName = segmentMetadata.getName();
Schema segmentSchema = segmentMetadata.getSchema();
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(segmentSchema,
segmentName);
for (FieldSpec fieldSpec : segmentSchema.getAllFieldSpecs()) {
@@ -163,15 +196,15 @@ public class ImmutableSegmentLoader {
// FIXME: star tree only works with local SegmentDirectory
// Load star-tree index if it exists
StarTreeIndexContainer starTreeIndexContainer = null;
- if (segmentMetadata.getStarTreeV2MetadataList() != null) {
+ if (segmentMetadata.getStarTreeV2MetadataList() != null && localIndexDir
!= null) {
starTreeIndexContainer =
- new
StarTreeIndexContainer(SegmentDirectoryPaths.findSegmentDirectory(indexDir),
segmentMetadata,
+ new
StarTreeIndexContainer(SegmentDirectoryPaths.findSegmentDirectory(localIndexDir),
segmentMetadata,
indexContainerMap, indexLoadingConfig.getReadMode());
}
ImmutableSegmentImpl segment =
- new ImmutableSegmentImpl(actualSegmentDirectory, segmentMetadata,
indexContainerMap, starTreeIndexContainer);
- LOGGER.info("Successfully loaded segment {} with config: {}", segmentName,
segmentDirectoryConfigs);
+ new ImmutableSegmentImpl(segmentDirectory, segmentMetadata,
indexContainerMap, starTreeIndexContainer);
+ LOGGER.info("Successfully loaded segment: {} with SegmentDirectory",
segmentName);
return segment;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
index 87bed35..ab836df 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
@@ -47,7 +47,11 @@ public class DefaultSegmentDirectoryLoader implements
SegmentDirectoryLoader {
public SegmentDirectory load(URI indexDir, SegmentDirectoryLoaderContext
segmentLoaderContext)
throws Exception {
PinotConfiguration segmentDirectoryConfigs =
segmentLoaderContext.getSegmentDirectoryConfigs();
- return new SegmentLocalFSDirectory(new File(indexDir),
+ File directory = new File(indexDir);
+ if (!directory.exists()) {
+ return new SegmentLocalFSDirectory(directory);
+ }
+ return new SegmentLocalFSDirectory(directory,
ReadMode.valueOf(segmentDirectoryConfigs.getProperty(IndexLoadingConfig.READ_MODE_KEY)));
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index f233ace..591d9ac 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -92,7 +92,7 @@ public final class PhysicalColumnIndexContainer implements
ColumnIndexContainer
_nullValueVectorReader = null;
}
- if (loadTextIndex) {
+ if (loadTextIndex && segmentIndexDir != null) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName,
ColumnIndexType.TEXT_INDEX));
Map<String, Map<String, String>> columnProperties =
indexLoadingConfig.getColumnProperties();
_textIndex = indexReaderProvider.newTextIndexReader(segmentIndexDir,
metadata, columnProperties.get(columnName));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 47b01ce..18e5ae3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.segment.index.loader;
import java.io.File;
+import java.net.URI;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
@@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory;
public class SegmentPreProcessor implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPreProcessor.class);
- private final File _indexDir;
+ private final URI _indexDirURI;
private final IndexLoadingConfig _indexLoadingConfig;
private final Schema _schema;
private final SegmentDirectory _segmentDirectory;
@@ -63,7 +64,7 @@ public class SegmentPreProcessor implements AutoCloseable {
public SegmentPreProcessor(SegmentDirectory segmentDirectory,
IndexLoadingConfig indexLoadingConfig,
@Nullable Schema schema) {
_segmentDirectory = segmentDirectory;
- _indexDir = new File(segmentDirectory.getIndexDir());
+ _indexDirURI = segmentDirectory.getIndexDir();
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_segmentMetadata = segmentDirectory.getSegmentMetadata();
@@ -82,16 +83,19 @@ public class SegmentPreProcessor implements AutoCloseable {
return;
}
+ // Segment processing has to be done with a local directory.
+ File indexDir = new File(_indexDirURI);
+
// This fixes the issue of temporary files not getting deleted after
creating new inverted indexes.
- removeInvertedIndexTempFiles();
+ removeInvertedIndexTempFiles(indexDir);
try (SegmentDirectory.Writer segmentWriter =
_segmentDirectory.createWriter()) {
// Update default columns according to the schema.
if (_schema != null) {
DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory
- .getDefaultColumnHandler(_indexDir, _segmentMetadata,
_indexLoadingConfig, _schema, segmentWriter);
+ .getDefaultColumnHandler(indexDir, _segmentMetadata,
_indexLoadingConfig, _schema, segmentWriter);
defaultColumnHandler.updateDefaultColumns();
- _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ _segmentMetadata = new SegmentMetadataImpl(indexDir);
_segmentDirectory.reloadMetadata();
} else {
LOGGER.warn("Skip creating default columns for segment: {} without
schema", _segmentMetadata.getName());
@@ -105,7 +109,7 @@ public class SegmentPreProcessor implements AutoCloseable {
}
// Create/modify/remove star-trees if required.
- processStarTrees();
+ processStarTrees(indexDir);
// Add min/max value to column metadata according to the prune mode.
// For star-tree index, because it can only increase the range, so
min/max value can still be used in pruner.
@@ -116,7 +120,7 @@ public class SegmentPreProcessor implements AutoCloseable {
new ColumnMinMaxValueGenerator(_segmentMetadata, segmentWriter,
columnMinMaxValueGeneratorMode);
columnMinMaxValueGenerator.addColumnMinMaxValue();
// NOTE: This step may modify the segment metadata. When adding new
steps after this, un-comment the next line.
- // _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ // _segmentMetadata = new SegmentMetadataImpl(indexDir);
}
segmentWriter.save();
@@ -137,7 +141,7 @@ public class SegmentPreProcessor implements AutoCloseable {
// Check if there is need to update default columns according to the
schema.
if (_schema != null) {
DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory
- .getDefaultColumnHandler(_indexDir, _segmentMetadata,
_indexLoadingConfig, _schema, null);
+ .getDefaultColumnHandler(null, _segmentMetadata,
_indexLoadingConfig, _schema, null);
if (defaultColumnHandler.needUpdateDefaultColumns()) {
return true;
}
@@ -190,7 +194,7 @@ public class SegmentPreProcessor implements AutoCloseable {
return !starTreeBuilderConfigs.isEmpty();
}
- private void processStarTrees()
+ private void processStarTrees(File indexDir)
throws Exception {
// Create/modify/remove star-trees if required
if (_indexLoadingConfig.isEnableDynamicStarTreeCreation()) {
@@ -204,8 +208,8 @@ public class SegmentPreProcessor implements AutoCloseable {
if
(StarTreeBuilderUtils.shouldRemoveExistingStarTrees(starTreeBuilderConfigs,
starTreeMetadataList)) {
// Remove the existing star-trees
LOGGER.info("Removing star-trees from segment: {}",
_segmentMetadata.getName());
- StarTreeBuilderUtils.removeStarTrees(_indexDir);
- _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ StarTreeBuilderUtils.removeStarTrees(indexDir);
+ _segmentMetadata = new SegmentMetadataImpl(indexDir);
} else {
// Existing star-trees match the builder configs, no need to
generate the star-trees
shouldGenerateStarTree = false;
@@ -214,11 +218,11 @@ public class SegmentPreProcessor implements AutoCloseable
{
// Generate the star-trees if needed
if (shouldGenerateStarTree) {
// NOTE: Always use OFF_HEAP mode on server side.
- try (MultipleTreesBuilder builder = new
MultipleTreesBuilder(starTreeBuilderConfigs, _indexDir,
+ try (MultipleTreesBuilder builder = new
MultipleTreesBuilder(starTreeBuilderConfigs, indexDir,
MultipleTreesBuilder.BuildMode.OFF_HEAP)) {
builder.build();
}
- _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ _segmentMetadata = new SegmentMetadataImpl(indexDir);
}
}
}
@@ -227,8 +231,8 @@ public class SegmentPreProcessor implements AutoCloseable {
* Remove all the existing inverted index temp files before loading
segments, by looking
* for all files in the directory and remove the ones with
'.bitmap.inv.tmp' extension.
*/
- private void removeInvertedIndexTempFiles() {
- File[] directoryListing = _indexDir.listFiles();
+ private void removeInvertedIndexTempFiles(File indexDir) {
+ File[] directoryListing = indexDir.listFiles();
if (directoryListing == null) {
return;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 36cf3fd..7735189 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -51,8 +51,8 @@ public class ColumnMinMaxValueGenerator {
public ColumnMinMaxValueGenerator(SegmentMetadataImpl segmentMetadata,
SegmentDirectory.Writer segmentWriter,
ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode) {
_segmentMetadata = segmentMetadata;
- _segmentProperties =
SegmentMetadataImpl.getPropertiesConfiguration(_segmentMetadata.getIndexDir());
_segmentWriter = segmentWriter;
+ _segmentProperties = segmentMetadata.getPropertiesConfiguration();
_columnMinMaxValueGeneratorMode = columnMinMaxValueGeneratorMode;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 45c0554..892ad8f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -123,7 +123,7 @@ public abstract class BaseDefaultColumnHandler implements
DefaultColumnHandler {
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_segmentWriter = segmentWriter;
- _segmentProperties =
SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
+ _segmentProperties = _segmentMetadata.getPropertiesConfiguration();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index d2dd7c6..9cff2ea 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -36,6 +36,7 @@ import
org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +64,15 @@ public class SegmentLocalFSDirectory extends
SegmentDirectory {
private ColumnIndexDirectory _columnIndexDirectory;
+ // Create an empty SegmentLocalFSDirectory object mainly used to
+ // prepare env for subsequent processing on the segment.
+ public SegmentLocalFSDirectory(File directory) {
+ _indexDir = directory;
+ _segmentDirectory = null;
+ _segmentLock = new SegmentLock();
+ _readMode = null;
+ }
+
public SegmentLocalFSDirectory(File directory, ReadMode readMode)
throws IOException {
this(directory, new SegmentMetadataImpl(directory), readMode);
@@ -95,6 +105,20 @@ public class SegmentLocalFSDirectory extends
SegmentDirectory {
}
@Override
+ public void copyTo(File dest)
+ throws Exception {
+ File src = _indexDir;
+ if (!src.exists()) {
+ // If the original one doesn't exist, then try the backup directory.
+ File parentDir = _indexDir.getParentFile();
+ src = new File(parentDir, _indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ }
+ if (src.exists() && !src.equals(dest)) {
+ FileUtils.copyDirectory(src, dest);
+ }
+ }
+
+ @Override
public SegmentMetadataImpl getSegmentMetadata() {
return _segmentMetadata;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index f0847ab..3fed351 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -80,6 +80,9 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private SegmentVersion _segmentVersion;
private List<StarTreeV2Metadata> _starTreeV2MetadataList;
+ // Caching properties around can be costly when the number of segments is
high according to the
+ // finding in PR #2996. So for now, caching is used only when initializing
from input streams.
+ private PropertiesConfiguration _segmentMetadataPropertiesConfiguration =
null;
private String _creatorName;
private int _totalDocs;
private final Map<String, String> _customMap = new HashMap<>();
@@ -96,13 +99,13 @@ public class SegmentMetadataImpl implements SegmentMetadata
{
_columnMetadataMap = new HashMap<>();
_schema = new Schema();
- PropertiesConfiguration segmentMetadataPropertiesConfiguration =
-
CommonsConfigurationUtils.fromInputStream(metadataPropertiesInputStream);
- init(segmentMetadataPropertiesConfiguration);
+ // Caching properties when initializing from input streams.
+ _segmentMetadataPropertiesConfiguration =
CommonsConfigurationUtils.fromInputStream(metadataPropertiesInputStream);
+ init(_segmentMetadataPropertiesConfiguration);
loadCreationMeta(creationMetaInputStream);
- setTimeInfo(segmentMetadataPropertiesConfiguration);
- _totalDocs =
segmentMetadataPropertiesConfiguration.getInt(Segment.SEGMENT_TOTAL_DOCS);
+ setTimeInfo(_segmentMetadataPropertiesConfiguration);
+ _totalDocs =
_segmentMetadataPropertiesConfiguration.getInt(Segment.SEGMENT_TOTAL_DOCS);
}
/**
@@ -139,6 +142,11 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
_creationTime = creationTime;
}
+ public PropertiesConfiguration getPropertiesConfiguration() {
+ return (_segmentMetadataPropertiesConfiguration != null) ?
_segmentMetadataPropertiesConfiguration
+ : SegmentMetadataImpl.getPropertiesConfiguration(_indexDir);
+ }
+
public static PropertiesConfiguration getPropertiesConfiguration(File
indexDir) {
File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir);
Preconditions.checkNotNull(metadataFile, "Cannot find segment metadata
file under directory: %s", indexDir);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index 7f0dc76..cb207e0 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.spi.store;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
@@ -141,6 +142,14 @@ public abstract class SegmentDirectory implements
Closeable {
}
/**
+ * Copy segment directory to a local directory.
+ * @param dest the destination directory
+ */
+ public void copyTo(File dest)
+ throws Exception {
+ }
+
+ /**
* Reader for columnar index buffers from segment directory
*/
public abstract class Reader implements Closeable {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9d1af9c..21d3c90 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.server.starter.helix;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -54,6 +55,8 @@ import
org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -271,22 +274,18 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
File indexDir = segmentMetadata.getIndexDir();
if (indexDir == null) {
- if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
- LOGGER.info("Skip reloading REALTIME consuming segment: {} in table:
{}", segmentName, tableNameWithType);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager == null) {
return;
}
- Preconditions.checkState(schema != null, "Failed to find schema for
table: {}", tableNameWithType);
- LOGGER.info("Try reloading REALTIME consuming segment: {} in table: {}",
segmentName, tableNameWithType);
- SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
- if (segmentDataManager != null) {
- try {
- MutableSegmentImpl mutableSegment = (MutableSegmentImpl)
segmentDataManager.getSegment();
- mutableSegment.addExtraColumns(schema);
- } finally {
- tableDataManager.releaseSegment(segmentDataManager);
+ try {
+ if (reloadMutableSegment(tableNameWithType, segmentName,
segmentDataManager, schema)) {
+ // A mutable segment has been found and reloaded.
+ return;
}
+ } finally {
+ tableDataManager.releaseSegment(segmentDataManager);
}
- return;
}
SegmentZKMetadata zkMetadata =
@@ -307,6 +306,30 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
}
+ /**
+ * Try to reload a mutable segment.
+ * @return true if the segment is mutable and loaded; false if the segment
is immutable.
+ */
+ @VisibleForTesting
+ boolean reloadMutableSegment(String tableNameWithType, String segmentName,
+ SegmentDataManager segmentDataManager, @Nullable Schema schema) {
+ IndexSegment segment = segmentDataManager.getSegment();
+ if (segment instanceof ImmutableSegment) {
+ LOGGER.info("Found an immutable segment: {} in table: {}", segmentName,
tableNameWithType);
+ return false;
+ }
+ // Found a mutable/consuming segment from REALTIME table.
+ if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
+ LOGGER.info("Skip reloading REALTIME consuming segment: {} in table:
{}", segmentName, tableNameWithType);
+ return true;
+ }
+ LOGGER.info("Reloading REALTIME consuming segment: {} in table: {}",
segmentName, tableNameWithType);
+ Preconditions.checkState(schema != null, "Failed to find schema for table:
{}", tableNameWithType);
+ MutableSegmentImpl mutableSegment = (MutableSegmentImpl) segment;
+ mutableSegment.addExtraColumns(schema);
+ return true;
+ }
+
@Override
public void addOrReplaceSegment(String tableNameWithType, String segmentName)
throws Exception {
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
new file mode 100644
index 0000000..a8809bf
--- /dev/null
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.io.File;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class HelixInstanceDataManagerTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"HelixInstanceDataManagerTest");
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+
+ @Test
+ public void testReloadMutableSegment()
+ throws ConfigurationException {
+ // Provides required configs to init the instance data manager.
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty("id", "id01");
+ config.setProperty("dataDir", TEMP_DIR.getAbsolutePath());
+ config.setProperty("segmentTarDir", TEMP_DIR.getAbsolutePath());
+ config.setProperty("readMode", "mmap");
+ config.setProperty("reload.consumingSegment", "false");
+
+ HelixInstanceDataManager mgr = new HelixInstanceDataManager();
+ mgr.init(config, mock(HelixManager.class), mock(ServerMetrics.class));
+
+ SegmentDataManager segMgr = mock(SegmentDataManager.class);
+
+ when(segMgr.getSegment()).thenReturn(mock(ImmutableSegment.class));
+ assertFalse(mgr.reloadMutableSegment("table01", "seg01", segMgr, null));
+
+ when(segMgr.getSegment()).thenReturn(mock(MutableSegment.class));
+ assertTrue(mgr.reloadMutableSegment("table01", "seg01", segMgr, null));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]