This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch deep.store.dir.structure
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/deep.store.dir.structure by
this push:
new 48d7602 Revert deep store directory structure changes after
introducing upsert (#6974)
48d7602 is described below
commit 48d760248917e621b635b01982bce1e5805b7975
Author: Sajjad Moradi <[email protected]>
AuthorDate: Tue May 25 12:16:21 2021 -0700
Revert deep store directory structure changes after introducing upsert
(#6974)
---
.../metadata/segment/ColumnPartitionMetadata.java | 4 +-
.../common/utils/FileUploadDownloadClient.java | 24 ---
.../apache/pinot/common/utils/SegmentUtils.java | 68 -------
.../apache/pinot/controller/ControllerConf.java | 5 +-
.../PinotSegmentUploadDownloadRestletResource.java | 68 +++----
.../pinot/controller/api/upload/ZKOperator.java | 55 +++---
.../helix/core/PinotHelixResourceManager.java | 96 +++------
.../segment/RealtimeSegmentAssignment.java | 35 ++--
.../helix/core/util/ZKMetadataUtils.java | 32 +--
.../api/PinotSegmentRestletResourceTest.java | 6 +-
.../pinot/controller/api/TableViewsTest.java | 2 +-
.../controller/api/upload/ZKOperatorTest.java | 10 +-
.../helix/ControllerInstanceToggleTest.java | 2 +-
.../controller/helix/ControllerSentinelTestV2.java | 6 +-
.../controller/helix/PinotResourceManagerTest.java | 99 ++--------
.../TableRebalancerClusterStatelessTest.java | 4 +-
.../helix/core/retention/RetentionManagerTest.java | 4 +-
.../controller/utils/SegmentMetadataMockUtils.java | 8 -
.../validation/ValidationManagerTest.java | 4 +-
.../manager/realtime/RealtimeTableDataManager.java | 26 +--
.../tests/BaseClusterIntegrationTest.java | 25 ---
.../tests/BasicAuthRealtimeIntegrationTest.java | 6 +-
.../UpsertTableSegmentUploadIntegrationTest.java | 219 ---------------------
.../src/test/resources/upsert_table_test.schema | 33 ----
.../src/test/resources/upsert_test.tar.gz | Bin 9911 -> 0 bytes
.../upsert/PartitionUpsertMetadataManager.java | 2 -
.../apache/pinot/spi/utils/CommonConstants.java | 7 +-
.../pinot/tools/perf/PerfBenchmarkDriver.java | 5 +-
.../pinot/tools/perf/PerfBenchmarkRunner.java | 2 +-
29 files changed, 155 insertions(+), 702 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
index 537f01e..0220c4f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
@@ -34,8 +34,8 @@ import org.apache.commons.lang3.StringUtils;
/**
* Class for partition related column metadata:
* <ul>
- * <li>The name of the Partition function used to map the column values to
their partitions</li>
- * <li>Total number of partitions</li>
+ * <li>Partition function</li>
+ * <li>Number of total partitions</li>
* <li>Set of partitions the column contains</li>
* </ul>
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index ba32144..c874891 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -62,7 +62,6 @@ import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
@@ -92,7 +91,6 @@ public class FileUploadDownloadClient implements Closeable {
public static class QueryParameters {
public static final String ENABLE_PARALLEL_PUSH_PROTECTION =
"enableParallelPushProtection";
public static final String TABLE_NAME = "tableName";
- public static final String TABLE_TYPE = "tableType";
}
public enum FileUploadType {
@@ -690,28 +688,6 @@ public class FileUploadDownloadClient implements Closeable
{
}
/**
- * Upload segment with segment file using default settings. Include table
name and type as a request parameters.
- *
- * @param uri URI
- * @param segmentName Segment name
- * @param segmentFile Segment file
- * @param tableName Table name with or without type suffix
- * @param tableType Table type
- * @return Response
- * @throws IOException
- * @throws HttpErrorStatusException
- */
- public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File
segmentFile, String tableName,
- TableType tableType)
- throws IOException, HttpErrorStatusException {
- // Add table name and type request parameters
- NameValuePair tableNameValuePair = new
BasicNameValuePair(QueryParameters.TABLE_NAME, tableName);
- NameValuePair tableTypeValuePair = new
BasicNameValuePair(QueryParameters.TABLE_TYPE, tableType.name());
- List<NameValuePair> parameters = Arrays.asList(tableNameValuePair,
tableTypeValuePair);
- return uploadSegment(uri, segmentName, segmentFile, null, parameters,
DEFAULT_SOCKET_TIMEOUT_MS);
- }
-
- /**
* Upload segment with segment file input stream.
*
* Note: table name has to be set as a parameter.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
deleted file mode 100644
index 84c7a79..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.utils;
-
-import com.google.common.base.Preconditions;
-import java.util.Set;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-
-
-// Util functions related to segments.
-public class SegmentUtils {
- // Returns the partition id of a realtime segment based segment name and
segment metadata info retrieved via Helix.
- // Important: The method is costly because it may read data from zookeeper.
Do not use it in any query execution
- // path.
- public static int getRealtimeSegmentPartitionId(String segmentName, String
realtimeTableName,
- HelixManager helixManager, String partitionColumn) {
- // A fast path if the segmentName is a LLC segment name and we can get the
partition id from the name directly.
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- return new LLCSegmentName(segmentName).getPartitionGroupId();
- }
- // Otherwise, retrieve the partition id from the segment zk metadata.
Currently only realtime segments from upsert
- // enabled tables have partition ids in their segment metadata.
- RealtimeSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
- .getRealtimeSegmentZKMetadata(helixManager.getHelixPropertyStore(),
realtimeTableName, segmentName);
- Preconditions
- .checkState(segmentZKMetadata != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
- segmentName, realtimeTableName);
- return getSegmentPartitionIdFromZkMetaData(realtimeTableName,
segmentZKMetadata, partitionColumn);
- }
-
- private static int getSegmentPartitionIdFromZkMetaData(String
realtimeTableName,
- RealtimeSegmentZKMetadata segmentZKMetadata, String partitionColumn) {
- String segmentName = segmentZKMetadata.getSegmentName();
- Preconditions.checkState(segmentZKMetadata.getPartitionMetadata() != null,
- "Segment ZK metadata for segment: %s of table: %s does not contain
partition metadata", segmentName,
- realtimeTableName);
-
- ColumnPartitionMetadata partitionMetadata =
-
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(partitionColumn);
- Preconditions.checkState(partitionMetadata != null,
- "Segment ZK metadata for segment: %s of table: %s does not contain
partition metadata for column: %s. Check if the table is an upsert table.",
- segmentName, realtimeTableName, partitionColumn);
- Set<Integer> partitions = partitionMetadata.getPartitions();
- Preconditions.checkState(partitions.size() == 1,
- "Segment ZK metadata for segment: %s of table: %s contains multiple
partitions for column: %s with %s",
- segmentName, realtimeTableName, partitionColumn, partitions);
- return partitions.iterator().next();
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index ce91c01..9d47b27 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -80,8 +80,6 @@ public class ControllerConf extends PinotConfiguration {
"controller.offline.segment.interval.checker.frequencyInSeconds";
public static final String
REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
"controller.realtime.segment.validation.frequencyInSeconds";
- public static final String
REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS =
- "controller.realtime.segment.validation.initialDelayInSeconds";
public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS
=
"controller.broker.resource.validation.frequencyInSeconds";
public static final String
BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
@@ -630,8 +628,7 @@ public class ControllerConf extends PinotConfiguration {
}
public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
- return
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
- getPeriodicTaskInitialDelayInSeconds());
+ return getPeriodicTaskInitialDelayInSeconds();
}
public long getPinotTaskManagerInitialDelaySeconds() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 35a3e07..7584ab0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -184,7 +184,7 @@ public class PinotSegmentUploadDownloadRestletResource {
return builder.build();
}
- private SuccessResponse uploadSegment(@Nullable String tableName, TableType
tableType, FormDataMultiPart multiPart,
+ private SuccessResponse uploadSegment(@Nullable String tableName,
FormDataMultiPart multiPart,
boolean enableParallelPushProtection, HttpHeaders headers, Request
request, boolean moveSegmentToFinalLocation) {
String uploadTypeStr = null;
String crypterClassNameInHeader = null;
@@ -248,36 +248,24 @@ public class PinotSegmentUploadDownloadRestletResource {
LOGGER.info("Uploading a segment {} to table: {}, push type {},
(Derived from segment metadata)", segmentName, tableName, uploadType);
}
- String tableNameWithType;
- if (tableType == TableType.OFFLINE) {
- tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
- } else {
- if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) {
- throw new UnsupportedOperationException(
- "Upload segment to non-upsert realtime table is not supported "
+ rawTableName);
- }
- tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
- }
-
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String clientAddress =
InetAddress.getByName(request.getRemoteAddr()).getHostName();
LOGGER.info("Processing upload request for segment: {} of table: {} from
client: {}, ingestion descriptor: {}",
- segmentName, tableNameWithType, clientAddress, ingestionDescriptor);
+ segmentName, offlineTableName, clientAddress, ingestionDescriptor);
- // Skip segment validation if upload is to an offline table and only
segment metadata. Skip segment validation for
- // realtime tables because the feature is experimental and only
applicable to upsert enabled table currently.
- if (tableType == TableType.OFFLINE && uploadType !=
FileUploadDownloadClient.FileUploadType.METADATA) {
+ // Skip segment validation if upload only segment metadata
+ if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
// Validate segment
new SegmentValidator(_pinotHelixResourceManager, _controllerConf,
_executor, _connectionManager,
- _controllerMetrics,
_leadControllerManager.isLeaderForTable(tableNameWithType))
- .validateOfflineSegment(tableNameWithType, segmentMetadata,
tempSegmentDir);
+ _controllerMetrics,
_leadControllerManager.isLeaderForTable(offlineTableName)).validateOfflineSegment(offlineTableName,
segmentMetadata, tempSegmentDir);
}
// Encrypt segment
String crypterClassNameInTableConfig =
-
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(tableNameWithType);
+
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(offlineTableName);
Pair<String, File> encryptionInfo =
encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile,
uploadedSegmentIsEncrypted,
- crypterClassNameInHeader, crypterClassNameInTableConfig,
segmentName, tableNameWithType);
+ crypterClassNameInHeader, crypterClassNameInTableConfig,
segmentName, tableName);
String crypterClassName = encryptionInfo.getLeft();
File finalSegmentFile = encryptionInfo.getRight();
@@ -289,17 +277,17 @@ public class PinotSegmentUploadDownloadRestletResource {
if (!moveSegmentToFinalLocation) {
LOGGER
.info("Setting zkDownloadUri: to {} for segment: {} of table: {},
skipping move", downloadUri, segmentName,
- tableNameWithType);
+ offlineTableName);
zkDownloadUri = downloadUri;
} else {
- zkDownloadUri = getZkDownloadURIForSegmentUpload(tableNameWithType,
segmentName);
+ zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName,
segmentName);
}
// Zk operations
- completeZkOperations(enableParallelPushProtection, headers,
finalSegmentFile, tableNameWithType, segmentMetadata,
+ completeZkOperations(enableParallelPushProtection, headers,
finalSegmentFile, rawTableName, segmentMetadata,
segmentName, zkDownloadUri, moveSegmentToFinalLocation,
crypterClassName);
- return new SuccessResponse("Successfully uploaded segment: " +
segmentName + " of table: " + tableNameWithType);
+ return new SuccessResponse("Successfully uploaded segment: " +
segmentName + " of table: " + rawTableName);
} catch (WebApplicationException e) {
throw e;
} catch (Exception e) {
@@ -324,7 +312,7 @@ public class PinotSegmentUploadDownloadRestletResource {
Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File
tempEncryptedFile,
boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment,
String crypterClassNameInTableConfig,
- String segmentName, String tableNameWithType) {
+ String segmentName, String tableName) {
boolean segmentNeedsEncryption =
!Strings.isNullOrEmpty(crypterClassNameInTableConfig);
@@ -343,13 +331,13 @@ public class PinotSegmentUploadDownloadRestletResource {
throw new ControllerApplicationException(LOGGER, String.format(
"Uploaded segment is encrypted with '%s' while table config requires
'%s' as crypter "
+ "(segment name = '%s', table name = '%s').",
crypterUsedInUploadedSegment,
- crypterClassNameInTableConfig, segmentName, tableNameWithType),
Response.Status.INTERNAL_SERVER_ERROR);
+ crypterClassNameInTableConfig, segmentName, tableName),
Response.Status.INTERNAL_SERVER_ERROR);
}
// encrypt segment
PinotCrypter pinotCrypter =
PinotCrypterFactory.create(crypterClassNameInTableConfig);
LOGGER.info("Using crypter class '{}' for encrypting '{}' to '{}' (segment
name = '{}', table name = '{}').",
- crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile,
segmentName, tableNameWithType);
+ crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile,
segmentName, tableName);
pinotCrypter.encrypt(tempDecryptedFile, tempEncryptedFile);
return out;
@@ -386,14 +374,14 @@ public class PinotSegmentUploadDownloadRestletResource {
}
private void completeZkOperations(boolean enableParallelPushProtection,
HttpHeaders headers, File uploadedSegmentFile,
- String tableNameWithType, SegmentMetadata segmentMetadata, String
segmentName, String zkDownloadURI,
+ String rawTableName, SegmentMetadata segmentMetadata, String
segmentName, String zkDownloadURI,
boolean moveSegmentToFinalLocation, String crypter)
throws Exception {
URI finalSegmentLocationURI = URIUtils
-
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
tableNameWithType,
+
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
URIUtils.encode(segmentName));
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager,
_controllerConf, _controllerMetrics);
- zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata,
finalSegmentLocationURI, uploadedSegmentFile,
+ zkOperator.completeSegmentOperations(rawTableName, segmentMetadata,
finalSegmentLocationURI, uploadedSegmentFile,
enableParallelPushProtection, headers, zkDownloadURI,
moveSegmentToFinalLocation, crypter);
}
@@ -416,13 +404,10 @@ public class PinotSegmentUploadDownloadRestletResource {
// it keeps it at the downloadURI header that is set. We will not support
this endpoint going forward.
public void uploadSegmentAsJson(String segmentJsonStr,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String
tableName,
- @ApiParam(value = "Type of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
@DefaultValue("OFFLINE") String tableType,
@ApiParam(value = "Whether to enable parallel push protection")
@DefaultValue("false")
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
boolean enableParallelPushProtection,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
null, enableParallelPushProtection,
- headers, request, false));
+ asyncResponse.resume(uploadSegment(tableName, null,
enableParallelPushProtection, headers, request, false));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -438,13 +423,10 @@ public class PinotSegmentUploadDownloadRestletResource {
// For the multipart endpoint, we will always move segment to final location
regardless of the segment endpoint.
public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String
tableName,
- @ApiParam(value = "Type of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
@DefaultValue("OFFLINE") String tableType,
@ApiParam(value = "Whether to enable parallel push protection")
@DefaultValue("false")
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
boolean enableParallelPushProtection,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
multiPart, enableParallelPushProtection,
- headers, request, true));
+ asyncResponse.resume(uploadSegment(tableName, multiPart,
enableParallelPushProtection, headers, request, true));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -462,13 +444,10 @@ public class PinotSegmentUploadDownloadRestletResource {
// endpoint in how it moves the segment to a Pinot-determined final
directory.
public void uploadSegmentAsJsonV2(String segmentJsonStr,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String
tableName,
- @ApiParam(value = "Type of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
@DefaultValue("OFFLINE") String tableType,
@ApiParam(value = "Whether to enable parallel push protection")
@DefaultValue("false")
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
boolean enableParallelPushProtection,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
null, enableParallelPushProtection,
- headers, request, true));
+ asyncResponse.resume(uploadSegment(tableName, null,
enableParallelPushProtection, headers, request, true));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -484,13 +463,10 @@ public class PinotSegmentUploadDownloadRestletResource {
// This behavior does not differ from v1 of the same endpoint.
public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String
tableName,
- @ApiParam(value = "Type of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
@DefaultValue("OFFLINE") String tableType,
@ApiParam(value = "Whether to enable parallel push protection")
@DefaultValue("false")
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
boolean enableParallelPushProtection,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
multiPart, enableParallelPushProtection,
- headers, request, true));
+ asyncResponse.resume(uploadSegment(tableName, multiPart,
enableParallelPushProtection, headers, request, true));
} catch (Throwable t) {
asyncResponse.resume(t);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 0c6ad65..7b743c6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -57,36 +57,33 @@ public class ZKOperator {
_controllerMetrics = controllerMetrics;
}
- public void completeSegmentOperations(String tableNameWithType,
SegmentMetadata segmentMetadata,
+ public void completeSegmentOperations(String rawTableName, SegmentMetadata
segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean
enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI, boolean
moveSegmentToFinalLocation, String crypter)
throws Exception {
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
- ZNRecord segmentMetadataZnRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
+
+ // Brand new segment, not refresh, directly add the segment
+ ZNRecord segmentMetadataZnRecord =
+
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName,
segmentName);
if (segmentMetadataZnRecord == null) {
- LOGGER.info("Adding new segment {} from table {}", segmentName,
tableNameWithType);
+ LOGGER.info("Adding new segment {} from table {}", segmentName,
rawTableName);
processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, crypter,
- tableNameWithType, segmentName, moveSegmentToFinalLocation);
+ rawTableName, segmentName, moveSegmentToFinalLocation);
return;
}
- // TODO Allow segment refreshing for realtime tables.
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- throw new ControllerApplicationException(LOGGER,
- "Refresh existing segment " + segmentName + " for realtime table " +
tableNameWithType + " is not yet supported ",
- Response.Status.NOT_IMPLEMENTED
- );
- }
+ LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, rawTableName);
- LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, tableNameWithType);
processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI, crypter,
tableNameWithType, segmentName,
+ enableParallelPushProtection, headers, zkDownloadURI, crypter,
offlineTableName, segmentName,
segmentMetadataZnRecord, moveSegmentToFinalLocation);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI,
- String crypter, String tableNameWithType, String segmentName, ZNRecord
znRecord,
+ String crypter, String offlineTableName, String segmentName, ZNRecord
znRecord,
boolean moveSegmentToFinalLocation)
throws Exception {
@@ -94,7 +91,7 @@ public class ZKOperator {
long existingCrc = existingSegmentZKMetadata.getCrc();
// Check if CRC match when IF-MATCH header is set
- checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+ checkCRC(headers, offlineTableName, segmentName, existingCrc);
// Check segment upload start time when parallel push protection enabled
if (enableParallelPushProtection) {
@@ -104,12 +101,12 @@ public class ZKOperator {
if (System.currentTimeMillis() - segmentUploadStartTime >
_controllerConf.getSegmentUploadTimeoutInMillis()) {
// Last segment upload does not finish properly, replace the segment
LOGGER
- .error("Segment: {} of table: {} was not properly uploaded,
replacing it", segmentName, tableNameWithType);
+ .error("Segment: {} of table: {} was not properly uploaded,
replacing it", segmentName, offlineTableName);
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED,
1L);
} else {
// Another segment upload is in progress
throw new ControllerApplicationException(LOGGER,
- "Another segment upload is in progress for segment: " +
segmentName + " of table: " + tableNameWithType
+ "Another segment upload is in progress for segment: " +
segmentName + " of table: " + offlineTableName
+ ", retry later", Response.Status.CONFLICT);
}
}
@@ -117,9 +114,9 @@ public class ZKOperator {
// Lock the segment by setting the upload start time in ZK
existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
if (!_pinotHelixResourceManager
- .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
znRecord.getVersion())) {
+ .updateZkMetadata(offlineTableName, existingSegmentZKMetadata,
znRecord.getVersion())) {
throw new ControllerApplicationException(LOGGER,
- "Failed to lock the segment: " + segmentName + " of table: " +
tableNameWithType + ", retry later",
+ "Failed to lock the segment: " + segmentName + " of table: " +
offlineTableName + ", retry later",
Response.Status.CONFLICT);
}
}
@@ -155,9 +152,9 @@ public class ZKOperator {
// (creation time is not included in the crc)
existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
- if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
existingSegmentZKMetadata)) {
+ if (!_pinotHelixResourceManager.updateZkMetadata(offlineTableName,
existingSegmentZKMetadata)) {
throw new RuntimeException(
- "Failed to update ZK metadata for segment: " + segmentName + "
of table: " + tableNameWithType);
+ "Failed to update ZK metadata for segment: " + segmentName + "
of table: " + offlineTableName);
}
} else {
// New segment is different with the existing one, update ZK metadata
and refresh the segment
@@ -169,16 +166,16 @@ public class ZKOperator {
LOGGER.info("Moved segment {} from temp location {} to {}",
segmentName,
currentSegmentLocation.getAbsolutePath(),
finalSegmentLocationURI.getPath());
} else {
- LOGGER.info("Skipping segment move, keeping segment {} from table {}
at {}", segmentName, tableNameWithType,
+ LOGGER.info("Skipping segment move, keeping segment {} from table {}
at {}", segmentName, offlineTableName,
zkDownloadURI);
}
_pinotHelixResourceManager
- .refreshSegment(tableNameWithType, segmentMetadata,
existingSegmentZKMetadata, zkDownloadURI, crypter);
+ .refreshSegment(offlineTableName, segmentMetadata,
existingSegmentZKMetadata, zkDownloadURI, crypter);
}
} catch (Exception e) {
- if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
existingSegmentZKMetadata)) {
- LOGGER.error("Failed to update ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
+ if (!_pinotHelixResourceManager.updateZkMetadata(offlineTableName,
existingSegmentZKMetadata)) {
+ LOGGER.error("Failed to update ZK metadata for segment: {} of table:
{}", segmentName, offlineTableName);
}
throw e;
}
@@ -204,7 +201,7 @@ public class ZKOperator {
}
private void processNewSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
- File currentSegmentLocation, String zkDownloadURI, String crypter,
String tableNameWithType, String segmentName,
+ File currentSegmentLocation, String zkDownloadURI, String crypter,
String rawTableName, String segmentName,
boolean moveSegmentToFinalLocation) {
// For v1 segment uploads, we will not move the segment
if (moveSegmentToFinalLocation) {
@@ -214,14 +211,14 @@ public class ZKOperator {
.info("Moved segment {} from temp location {} to {}", segmentName,
currentSegmentLocation.getAbsolutePath(),
finalSegmentLocationURI.getPath());
} catch (Exception e) {
- LOGGER.error("Could not move segment {} from table {} to permanent
directory", segmentName, tableNameWithType, e);
+ LOGGER.error("Could not move segment {} from table {} to permanent
directory", segmentName, rawTableName, e);
throw new RuntimeException(e);
}
} else {
- LOGGER.info("Skipping segment move, keeping segment {} from table {} at
{}", segmentName, tableNameWithType,
+ LOGGER.info("Skipping segment move, keeping segment {} from table {} at
{}", segmentName, rawTableName,
zkDownloadURI);
}
- _pinotHelixResourceManager.addNewSegment(tableNameWithType,
segmentMetadata, zkDownloadURI, crypter);
+ _pinotHelixResourceManager.addNewSegment(rawTableName, segmentMetadata,
zkDownloadURI, crypter);
}
private void moveSegmentToPermanentDirectory(File currentSegmentLocation,
URI finalSegmentLocationURI)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8441d12..b1a9369 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -86,7 +86,6 @@ import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.HashUtil;
@@ -121,7 +120,6 @@ import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
-import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
@@ -1625,110 +1623,72 @@ public class PinotHelixResourceManager {
return instanceSet;
}
- public void addNewSegment(String tableNameWithType, SegmentMetadata
segmentMetadata, String downloadUrl) {
- addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null);
+ public void addNewSegment(String tableName, SegmentMetadata segmentMetadata,
String downloadUrl) {
+ addNewSegment(tableName, segmentMetadata, downloadUrl, null);
}
- public void addNewSegment(String tableNameWithType, SegmentMetadata
segmentMetadata, String downloadUrl,
+ public void addNewSegment(String tableName, SegmentMetadata segmentMetadata,
String downloadUrl,
@Nullable String crypter) {
String segmentName = segmentMetadata.getName();
- InstancePartitionsType instancePartitionsType;
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
// NOTE: must first set the segment ZK metadata before assigning segment
to instances because segment assignment
// might need them to determine the partition of the segment, and server
will need them to download the segment
- ZNRecord znRecord;
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- Preconditions.checkState(isUpsertTable(tableNameWithType),
- "Upload segment " + segmentName + " for non upsert enabled realtime
table " + tableNameWithType
- + " is not supported");
- // In an upsert enabled LLC realtime table, all segments of the same
partition are collocated on the same server
- // -- consuming or completed. So it is fine to use CONSUMING as the
InstancePartitionsType.
- // TODO When upload segments is open to all realtime tables, we should
change the type to COMPLETED instead.
- // In addition, RealtimeSegmentAssignment.assignSegment(..) method
should be updated so that the method does not
- // assign segments to CONSUMING instance partition only.
- instancePartitionsType = InstancePartitionsType.CONSUMING;
- // Build the realtime segment zk metadata with necessary fields.
- LLCRealtimeSegmentZKMetadata segmentZKMetadata = new
LLCRealtimeSegmentZKMetadata();
- ZKMetadataUtils
- .updateSegmentMetadata(segmentZKMetadata, segmentMetadata,
CommonConstants.Segment.SegmentType.REALTIME);
- segmentZKMetadata.setDownloadUrl(downloadUrl);
- segmentZKMetadata.setCrypterName(crypter);
-
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
- znRecord = segmentZKMetadata.toZNRecord();
- } else {
- instancePartitionsType = InstancePartitionsType.OFFLINE;
- // Build the offline segment zk metadata with necessary fields.
- OfflineSegmentZKMetadata segmentZKMetadata = new
OfflineSegmentZKMetadata();
- ZKMetadataUtils
- .updateSegmentMetadata(segmentZKMetadata, segmentMetadata,
CommonConstants.Segment.SegmentType.OFFLINE);
- segmentZKMetadata.setDownloadUrl(downloadUrl);
- segmentZKMetadata.setCrypterName(crypter);
- segmentZKMetadata.setPushTime(System.currentTimeMillis());
- znRecord = segmentZKMetadata.toZNRecord();
- }
+ OfflineSegmentZKMetadata offlineSegmentZKMetadata = new
OfflineSegmentZKMetadata();
+ ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata);
+ offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
+ offlineSegmentZKMetadata.setCrypterName(crypter);
+ offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
String segmentZKMetadataPath =
-
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
segmentName);
+
ZKMetadataProvider.constructPropertyStorePathForSegment(offlineTableName,
segmentName);
Preconditions.checkState(
- _propertyStore.set(segmentZKMetadataPath, znRecord,
AccessOption.PERSISTENT),
- "Failed to set segment ZK metadata for table: " + tableNameWithType +
", segment: " + segmentName);
- LOGGER.info("Added segment: {} of table: {} to property store",
segmentName, tableNameWithType);
- assignTableSegment(tableNameWithType, segmentName, segmentZKMetadataPath,
instancePartitionsType);
- }
-
+ _propertyStore.set(segmentZKMetadataPath,
offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT),
+ "Failed to set segment ZK metadata for table: " + offlineTableName +
", segment: " + segmentName);
+ LOGGER.info("Added segment: {} of table: {} to property store",
segmentName, offlineTableName);
- private void assignTableSegment(String tableNameWithType, String
segmentName, String segmentZKMetadataPath,
- InstancePartitionsType instancePartitionsType) {
// Assign instances for the segment and add it into IdealState
try {
- TableConfig tableConfig = getTableConfig(tableNameWithType);
+ TableConfig offlineTableConfig = getTableConfig(offlineTableName);
Preconditions
- .checkState(tableConfig != null, "Failed to find table config for
table: " + tableNameWithType);
+ .checkState(offlineTableConfig != null, "Failed to find table config
for table: " + offlineTableName);
SegmentAssignment segmentAssignment =
- SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig);
+ SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
offlineTableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
Collections
- .singletonMap(instancePartitionsType, InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig,
instancePartitionsType));
- synchronized (getTableUpdaterLock(tableNameWithType)) {
- HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
+ .singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils
+ .fetchOrComputeInstancePartitions(_helixZkManager,
offlineTableConfig, InstancePartitionsType.OFFLINE));
+ synchronized (getTableUpdaterLock(offlineTableName)) {
+ HelixHelper.updateIdealState(_helixZkManager, offlineTableName,
idealState -> {
assert idealState != null;
Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
if (currentAssignment.containsKey(segmentName)) {
LOGGER.warn("Segment: {} already exists in the IdealState for
table: {}, do not update", segmentName,
- tableNameWithType);
+ offlineTableName);
} else {
List<String> assignedInstances =
segmentAssignment.assignSegment(segmentName,
currentAssignment, instancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table:
{}", segmentName, assignedInstances,
- tableNameWithType);
+ offlineTableName);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
}
return idealState;
});
- LOGGER.info("Added segment: {} to IdealState for table: {}",
segmentName, tableNameWithType);
+ LOGGER.info("Added segment: {} to IdealState for table: {}",
segmentName, offlineTableName);
}
} catch (Exception e) {
LOGGER
.error("Caught exception while adding segment: {} to IdealState for
table: {}, deleting segment ZK metadata",
- segmentName, tableNameWithType, e);
+ segmentName, offlineTableName, e);
if (_propertyStore.remove(segmentZKMetadataPath,
AccessOption.PERSISTENT)) {
- LOGGER.info("Deleted segment ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
+ LOGGER.info("Deleted segment ZK metadata for segment: {} of table:
{}", segmentName, offlineTableName);
} else {
LOGGER
- .error("Failed to deleted segment ZK metadata for segment: {} of
table: {}", segmentName, tableNameWithType);
+ .error("Failed to deleted segment ZK metadata for segment: {} of
table: {}", segmentName, offlineTableName);
}
throw e;
}
}
- public boolean isUpsertTable(String tableName) {
- TableConfig realtimeTableConfig =
getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName));
- if (realtimeTableConfig == null) {
- return false;
- }
- UpsertConfig upsertConfig = realtimeTableConfig.getUpsertConfig();
- return ((upsertConfig != null) && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE);
- }
-
private Object getTableUpdaterLock(String offlineTableName) {
return _tableUpdaterLocks[(offlineTableName.hashCode() &
Integer.MAX_VALUE) % _tableUpdaterLocks.length];
}
@@ -1757,7 +1717,7 @@ public class PinotHelixResourceManager {
// ZK metadata to refresh the segment (server will compare the segment ZK
metadata with the local metadata to decide
// whether to download the new segment; broker will update the the segment
partition info & time boundary based on
// the segment ZK metadata)
- ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
+ ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata);
offlineSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
offlineSegmentZKMetadata.setCrypterName(crypter);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 0e1974a..4f2ee84 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -32,9 +32,8 @@ import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
-import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -86,19 +85,15 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
private HelixManager _helixManager;
private String _realtimeTableName;
private int _replication;
- private String _partitionColumn;
@Override
public void init(HelixManager helixManager, TableConfig tableConfig) {
_helixManager = helixManager;
_realtimeTableName = tableConfig.getTableName();
_replication =
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
- ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
- tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
- _partitionColumn = replicaGroupStrategyConfig != null ?
replicaGroupStrategyConfig.getPartitionColumn() : null;
- LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {},
partitionColumn: {} for table: {}",
- _replication, _partitionColumn, _realtimeTableName);
+ LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {}
for table: {}", _replication,
+ _realtimeTableName);
}
@Override
@@ -141,8 +136,7 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
* Helper method to assign instances for CONSUMING segment based on the
segment partition id and instance partitions.
*/
private List<String> assignConsumingSegment(String segmentName,
InstancePartitions instancePartitions) {
- int partitionGroupId =
- SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
+ int partitionGroupId = new
LLCSegmentName(segmentName).getPartitionGroupId();
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
if (numReplicaGroups == 1) {
@@ -180,8 +174,9 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
- @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
Configuration config) {
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+ @Nullable List<Tier> sortedTiers, @Nullable Map<String,
InstancePartitions> tierInstancePartitionsMap,
+ Configuration config) {
InstancePartitions completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
InstancePartitions consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
@@ -303,7 +298,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
* Rebalances segments in the current assignment using the
instancePartitions and returns new assignment
*/
private Map<String, Map<String, String>> reassignSegments(String
instancePartitionType,
- Map<String, Map<String, String>> currentAssignment, InstancePartitions
instancePartitions, boolean bootstrap) {
+ Map<String, Map<String, String>> currentAssignment, InstancePartitions
instancePartitions,
+ boolean bootstrap) {
Map<String, Map<String, String>> newAssignment;
if (bootstrap) {
LOGGER.info("Bootstrapping segment assignment for {} segments of table:
{}", instancePartitionType,
@@ -320,8 +316,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
if (instancePartitions.getNumReplicaGroups() == 1) {
// Non-replica-group based assignment
- List<String> instances =
-
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
_replication);
+ List<String> instances = SegmentAssignmentUtils
+ .getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
_replication);
newAssignment = SegmentAssignmentUtils
.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
instances, _replication);
} else {
@@ -329,8 +325,7 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
Map<Integer, List<String>> partitionGroupIdToSegmentsMap = new
HashMap<>();
for (String segmentName : currentAssignment.keySet()) {
- int partitionGroupId = SegmentUtils
- .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName,
_helixManager, _partitionColumn);
+ int partitionGroupId = new
LLCSegmentName(segmentName).getPartitionGroupId();
partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k ->
new ArrayList<>()).add(segmentName);
}
@@ -343,7 +338,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
}
newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions, partitionGroupIdToSegmentsMap);
+ .rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions,
+ partitionGroupIdToSegmentsMap);
}
}
return newAssignment;
@@ -364,8 +360,7 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
// Replica-group based assignment
// Uniformly spray the segment partitions over the instance partitions
- int segmentPartitionId =
- SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
+ int segmentPartitionId = new
LLCSegmentName(segmentName).getPartitionGroupId();
int numPartitions = instancePartitions.getNumPartitions();
int partitionGroupId = segmentPartitionId % numPartitions;
return SegmentAssignmentUtils
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 324ea00..058f574 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -21,8 +21,8 @@ package org.apache.pinot.controller.helix.core.util;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata;
import
org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl;
@@ -35,24 +35,24 @@ public class ZKMetadataUtils {
private ZKMetadataUtils() {
}
- public static void updateSegmentMetadata(SegmentZKMetadata
segmentZKMetadata, SegmentMetadata segmentMetadata,
- SegmentType segmentType) {
- segmentZKMetadata.setSegmentName(segmentMetadata.getName());
- segmentZKMetadata.setTableName(segmentMetadata.getTableName());
- segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
- segmentZKMetadata.setSegmentType(segmentType);
+ public static void updateSegmentMetadata(OfflineSegmentZKMetadata
offlineSegmentZKMetadata,
+ SegmentMetadata segmentMetadata) {
+ offlineSegmentZKMetadata.setSegmentName(segmentMetadata.getName());
+ offlineSegmentZKMetadata.setTableName(segmentMetadata.getTableName());
+ offlineSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
+ offlineSegmentZKMetadata.setSegmentType(SegmentType.OFFLINE);
if (segmentMetadata.getTimeInterval() != null) {
- segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
- segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
- segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+ offlineSegmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
+ offlineSegmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
+ offlineSegmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
}
- segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
- segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
- segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+ offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
+
offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
- segmentZKMetadata.getCustomMap());
-
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+ offlineSegmentZKMetadata.getCustomMap());
+
offlineSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
// Extract column partition metadata (if any), and set it into segment ZK
metadata.
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
@@ -73,7 +73,7 @@ public class ZKMetadataUtils {
}
if (!columnPartitionMap.isEmpty()) {
- segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(columnPartitionMap));
+ offlineSegmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(columnPartitionMap));
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
index 722e551..fac5c18 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
@@ -61,8 +61,7 @@ public class PinotSegmentRestletResourceTest {
// Upload Segments
for (int i = 0; i < 5; ++i) {
SegmentMetadata segmentMetadata =
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME);
- ControllerTestUtils.getHelixResourceManager()
-
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
segmentMetadata, "downloadUrl");
+ ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME,
segmentMetadata, "downloadUrl");
segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
}
@@ -72,8 +71,7 @@ public class PinotSegmentRestletResourceTest {
// Add more segments
for (int i = 0; i < 5; ++i) {
SegmentMetadata segmentMetadata =
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME);
- ControllerTestUtils.getHelixResourceManager()
-
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
segmentMetadata, "downloadUrl");
+ ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME,
segmentMetadata, "downloadUrl");
segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 3c46bda..ba8898e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -54,7 +54,7 @@ public class TableViewsTest {
new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(2).build();
Assert.assertEquals(ControllerTestUtils.getHelixManager().getInstanceType(),
InstanceType.CONTROLLER);
ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME),
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
OFFLINE_SEGMENT_NAME), "downloadUrl");
// Create the hybrid table
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 9f57faf..597c746 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -41,7 +40,6 @@ import static org.testng.Assert.fail;
public class ZKOperatorTest {
private static final String TABLE_NAME = "operatorTestTable";
- private static final String TABLE_NAME_WITH_TYPE =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
private static final String SEGMENT_NAME = "testSegment";
@BeforeClass
@@ -61,7 +59,7 @@ public class ZKOperatorTest {
when(segmentMetadata.getCrc()).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
- zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE,
segmentMetadata, null, null, false, httpHeaders, "downloadUrl",
+ zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null,
null, false, httpHeaders, "downloadUrl",
false, "crypter");
OfflineSegmentZKMetadata segmentZKMetadata =
@@ -77,7 +75,7 @@ public class ZKOperatorTest {
// Refresh the segment with unmatched IF_MATCH field
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
try {
- zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE,
segmentMetadata, null, null, false, httpHeaders,
+ zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null,
null, false, httpHeaders,
"otherDownloadUrl", false, null);
fail();
} catch (Exception e) {
@@ -88,7 +86,7 @@ public class ZKOperatorTest {
// downloadURL and crypter
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
- zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE,
segmentMetadata, null, null, false, httpHeaders,
+ zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null,
null, false, httpHeaders,
"otherDownloadUrl", false, "otherCrypter");
segmentZKMetadata = ControllerTestUtils
.getHelixResourceManager().getOfflineSegmentZKMetadata(TABLE_NAME,
SEGMENT_NAME);
@@ -110,7 +108,7 @@ public class ZKOperatorTest {
// 1 second delay to avoid "org.apache.helix.HelixException: Specified
EXTERNALVIEW operatorTestTable_OFFLINE is
// not found!" exception from being thrown sporadically.
Thread.sleep(1000L);
- zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE,
segmentMetadata, null, null, false, httpHeaders,
+ zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null,
null, false, httpHeaders,
"otherDownloadUrl", false, "otherCrypter");
segmentZKMetadata = ControllerTestUtils
.getHelixResourceManager().getOfflineSegmentZKMetadata(TABLE_NAME,
SEGMENT_NAME);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
index 14523a3..726e2d0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
@@ -70,7 +70,7 @@ public class ControllerInstanceToggleTest {
// Add segments
for (int i = 0; i < ControllerTestUtils.NUM_SERVER_INSTANCES; i++) {
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(RAW_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
"downloadUrl");
Assert.assertEquals(
ControllerTestUtils
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
index 929d651..616b491 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
@@ -26,7 +26,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
@@ -65,9 +64,8 @@ public class ControllerSentinelTestV2 {
Assert.assertEquals(
ControllerTestUtils
.getHelixAdmin().getResourceIdealState(ControllerTestUtils.getHelixClusterName(),
TABLE_NAME + "_OFFLINE").getNumPartitions(), i);
- ControllerTestUtils.getHelixResourceManager()
-
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
- SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME),
"downloadUrl");
+ ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME),
+ "downloadUrl");
Assert.assertEquals(
ControllerTestUtils
.getHelixAdmin().getResourceIdealState(ControllerTestUtils.getHelixClusterName(),
TABLE_NAME + "_OFFLINE").getNumPartitions(),
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index ea5326e..e1fc87b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -18,23 +18,16 @@
*/
package org.apache.pinot.controller.helix;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerTestUtils;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
@@ -44,30 +37,15 @@ import org.testng.annotations.Test;
public class PinotResourceManagerTest {
- private static final String OFFLINE_TABLE_NAME =
"offlineResourceManagerTestTable_OFFLINE";
- private static final String REALTIME_TABLE_NAME =
"realtimeResourceManagerTestTable_REALTIME";
- private static final String NUM_REPLICAS_STRING = "2";
- private static final String PARTITION_COLUMN = "Partition_Column";
+ private static final String TABLE_NAME = "resourceManagerTestTable";
@BeforeClass
public void setUp() throws Exception {
ControllerTestUtils.setupClusterAndValidate();
- // Adding an offline table
- TableConfig offlineTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).build();
- ControllerTestUtils.getHelixResourceManager().addTable(offlineTableConfig);
-
- // Adding an upsert enabled realtime table which consumes from a stream
with 2 partitions
- Schema dummySchema =
ControllerTestUtils.createDummySchema(REALTIME_TABLE_NAME);
- ControllerTestUtils.addSchema(dummySchema);
- Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
- TableConfig realtimeTableConfig = new
TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).
-
setTableName(REALTIME_TABLE_NAME).setSchemaName(dummySchema.getSchemaName()).build();
-
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
- realtimeTableConfig.getValidationConfig()
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
- realtimeTableConfig.setUpsertConfig(new
UpsertConfig((UpsertConfig.Mode.FULL)));
-
ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig);
+ // Adding table
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
}
@Test
@@ -77,21 +55,21 @@ public class PinotResourceManagerTest {
// Segment ZK metadata does not exist
Assert.assertFalse(
-
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
+ "_OFFLINE", segmentZKMetadata, 0));
+
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME +
"_OFFLINE", segmentZKMetadata, 0));
// Set segment ZK metadata
Assert.assertTrue(
-
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
+ "_OFFLINE", segmentZKMetadata));
+
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME +
"_OFFLINE", segmentZKMetadata));
// Update ZK metadata
Assert.assertEquals(
-
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME
+ "_OFFLINE", "testSegment").getVersion(), 0);
+
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(TABLE_NAME
+ "_OFFLINE", "testSegment").getVersion(), 0);
Assert.assertTrue(
-
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
+ "_OFFLINE", segmentZKMetadata, 0));
+
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME +
"_OFFLINE", segmentZKMetadata, 0));
Assert.assertEquals(
-
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME
+ "_OFFLINE", "testSegment").getVersion(), 1);
+
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(TABLE_NAME
+ "_OFFLINE", "testSegment").getVersion(), 1);
Assert.assertFalse(
-
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
+ "_OFFLINE", segmentZKMetadata, 0));
+
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME +
"_OFFLINE", segmentZKMetadata, 0));
}
/**
@@ -104,12 +82,11 @@ public class PinotResourceManagerTest {
@Test
public void testBasicAndConcurrentAddingAndDeletingSegments() throws
Exception {
- final String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME);
+ final String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
// Basic add/delete case
for (int i = 1; i <= 2; i++) {
- ControllerTestUtils.getHelixResourceManager().addNewSegment(
- OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
+ ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME),
"downloadUrl");
}
IdealState idealState = ControllerTestUtils
@@ -131,8 +108,8 @@ public class PinotResourceManagerTest {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
-
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
"downloadUrl");
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME),
"downloadUrl");
}
}
});
@@ -161,54 +138,6 @@ public class PinotResourceManagerTest {
Assert.assertEquals(idealState.getPartitionSet().size(), 0);
}
- @Test
- public void testAddingRealtimeTableSegmentsWithPartitionIdInZkMetadata() {
- // Add three segments: two from partition 0 and 1 from partition 1;
- String partition0Segment0 = "realtimeResourceManagerTestTable__aa";
- String partition0Segment1 = "realtimeResourceManagerTestTable__bb";
- String partition1Segment1 = "realtimeResourceManagerTestTable__cc";
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME,
SegmentMetadataMockUtils
- .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME,
partition0Segment0, PARTITION_COLUMN, 0),
- "downloadUrl");
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME,
SegmentMetadataMockUtils
- .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME,
partition0Segment1, PARTITION_COLUMN, 0),
- "downloadUrl");
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME,
SegmentMetadataMockUtils
- .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME,
partition1Segment1, PARTITION_COLUMN, 1),
- "downloadUrl");
- Map<String, Integer> segment2PartitionId = new HashMap<>();
- segment2PartitionId.put(partition0Segment0, 0);
- segment2PartitionId.put(partition0Segment1, 0);
- segment2PartitionId.put(partition1Segment1, 1);
-
- IdealState idealState = ControllerTestUtils.getHelixAdmin()
- .getResourceIdealState(ControllerTestUtils.getHelixClusterName(),
- TableNameBuilder.REALTIME.tableNameWithType(REALTIME_TABLE_NAME));
- Set<String> segments = idealState.getPartitionSet();
- Assert.assertEquals(segments.size(), 5);
- Assert.assertTrue(segments.contains(partition0Segment0));
- Assert.assertTrue(segments.contains(partition0Segment1));
- Assert.assertTrue(segments.contains(partition1Segment1));
-
- // Check the segments of the same partition is assigned to the same set of
servers.
- Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
- for (String segment : segments) {
- Integer partitionId;
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
- partitionId = new LLCSegmentName(segment).getPartitionGroupId();
- } else {
- partitionId = segment2PartitionId.get(segment);
- }
- Assert.assertNotNull(partitionId);
- Set<String> instances = idealState.getInstanceSet(segment);
- if (segmentAssignment.containsKey(partitionId)) {
- Assert.assertEquals(instances, segmentAssignment.get(partitionId));
- } else {
- segmentAssignment.put(partitionId, instances);
- }
- }
- }
-
@AfterClass
public void tearDown() {
ControllerTestUtils.cleanup();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 196633a..e2393e3 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -105,7 +105,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
// Add the segments
int numSegments = 10;
for (int i = 0; i < numSegments; i++) {
- _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ _helixResourceManager.addNewSegment(RAW_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i), null);
}
Map<String, Map<String, String>> oldSegmentAssignment =
@@ -343,7 +343,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
// keep decreasing end time from today in steps of 3. 3 segments don't
move. 3 segment on tierA. 4 segments on tierB
for (int i = 0; i < numSegments; i++) {
- _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
SegmentMetadataMockUtils
+ _helixResourceManager.addNewSegment(TIERED_TABLE_NAME,
SegmentMetadataMockUtils
.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME,
SEGMENT_NAME_PREFIX + i, nowInDays), null);
nowInDays -= 3;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index cc7ecaf..6d52c03 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -72,7 +72,7 @@ public class RetentionManagerTest {
for (int i = 0; i < numOlderSegments; ++i) {
SegmentMetadata segmentMetadata = mockSegmentMetadata(pastTimeStamp,
pastTimeStamp, timeUnit);
OfflineSegmentZKMetadata offlineSegmentZKMetadata = new
OfflineSegmentZKMetadata();
- ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
+ ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata);
metadataList.add(offlineSegmentZKMetadata);
removedSegments.add(offlineSegmentZKMetadata.getSegmentName());
}
@@ -81,7 +81,7 @@ public class RetentionManagerTest {
SegmentMetadata segmentMetadata =
mockSegmentMetadata(dayAfterTomorrowTimeStamp,
dayAfterTomorrowTimeStamp, timeUnit);
OfflineSegmentZKMetadata offlineSegmentZKMetadata = new
OfflineSegmentZKMetadata();
- ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
+ ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata,
segmentMetadata);
metadataList.add(offlineSegmentZKMetadata);
}
final TableConfig tableConfig = createOfflineTableConfig();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 95e400a..1e0e210 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -19,12 +19,9 @@
package org.apache.pinot.controller.utils;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.segment.local.partition.MurmurPartitionFunction;
import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata;
import
org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -78,7 +75,6 @@ public class SegmentMetadataMockUtils {
ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
Set<Integer> partitions = Collections.singleton(partitionNumber);
when(columnMetadata.getPartitions()).thenReturn(partitions);
- when(columnMetadata.getPartitionFunction()).thenReturn(new
MurmurPartitionFunction(5));
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
if (columnName != null) {
@@ -87,10 +83,6 @@ public class SegmentMetadataMockUtils {
when(segmentMetadata.getTableName()).thenReturn(tableName);
when(segmentMetadata.getName()).thenReturn(segmentName);
when(segmentMetadata.getCrc()).thenReturn("0");
-
- Map<String, ColumnMetadata> columnMetadataMap = new HashMap<>();
- columnMetadataMap.put(columnName, columnMetadata);
- when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
return segmentMetadata;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 1296e38..86dbaf8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -50,7 +50,7 @@ import static org.testng.Assert.assertEquals;
*/
public class ValidationManagerTest {
private static final String TEST_TABLE_NAME = "validationTable";
- private static final String OFFLINE_TEST_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
+ private static final String TEST_TABLE_TWO = "validationTable2";
private static final String TEST_SEGMENT_NAME = "testSegment";
private TableConfig _offlineTableConfig;
@@ -68,7 +68,7 @@ public class ValidationManagerTest {
public void testPushTimePersistence() {
SegmentMetadata segmentMetadata =
SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
-
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TEST_TABLE_NAME,
segmentMetadata, "downloadUrl");
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(TEST_TABLE_NAME,
segmentMetadata, "downloadUrl");
OfflineSegmentZKMetadata offlineSegmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getOfflineSegmentZKMetadata(TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
long pushTime = offlineSegmentZKMetadata.getPushTime();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5d4d43d..83beeb2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -45,7 +45,6 @@ import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SegmentName;
-import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
@@ -263,24 +262,24 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
Preconditions.checkNotNull(schema);
- File segmentDir = new File(_indexDir, segmentName);
+ File indexDir = new File(_indexDir, segmentName);
// Restart during segment reload might leave segment in inconsistent state
(index directory might not exist but
// segment backup directory existed), need to first try to recover from
reload failure before checking the existence
// of the index directory and loading segment from it
- LoaderUtils.reloadFailureRecovery(segmentDir);
+ LoaderUtils.reloadFailureRecovery(indexDir);
boolean isLLCSegment =
SegmentName.isLowLevelConsumerSegmentName(segmentName);
- if (segmentDir.exists()) {
+ if (indexDir.exists()) {
// Segment already exists on disk
if (realtimeSegmentZKMetadata.getStatus() == Status.DONE) {
// Metadata has been committed, load the local segment
try {
- addSegment(ImmutableSegmentLoader.load(segmentDir,
indexLoadingConfig, schema));
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
return;
} catch (Exception e) {
if (isLLCSegment) {
// For LLC and segments, delete the local copy and download a new
copy from the controller
- FileUtils.deleteQuietly(segmentDir);
+ FileUtils.deleteQuietly(indexDir);
if (e instanceof V3RemoveIndexException) {
_logger.info("Unable to remove index from V3 format segment: {},
downloading a new copy", segmentName, e);
} else {
@@ -294,18 +293,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
} else {
// Metadata has not been committed, delete the local segment
- FileUtils.deleteQuietly(segmentDir);
+ FileUtils.deleteQuietly(indexDir);
}
- } else if (realtimeSegmentZKMetadata.getStatus() == Status.UPLOADED) {
- // The segment is uploaded to an upsert enabled realtime table. Download
the segment and load.
- Preconditions.checkArgument(realtimeSegmentZKMetadata instanceof
LLCRealtimeSegmentZKMetadata,
- "Upload segment is not LLC segment");
- String downURL =
((LLCRealtimeSegmentZKMetadata)realtimeSegmentZKMetadata).getDownloadUrl();
- Preconditions.checkNotNull(downURL, "Upload segment metadata has no
download url");
- downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, downURL);
- _logger.info("Downloaded, untarred and add segment {} of table {} from
{}", segmentName, tableConfig.getTableName(),
- downURL);
- return;
}
// Start a new consuming segment or download the segment from the
controller
@@ -361,8 +350,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
columnToReaderMap.put(_timeColumnName, new
PinotSegmentColumnReader(immutableSegment, _timeColumnName));
int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
String segmentName = immutableSegment.getSegmentName();
- int partitionGroupId = SegmentUtils
- .getRealtimeSegmentPartitionId(segmentName, this.getTableName(),
_helixManager, _primaryKeyColumns.get(0));
+ int partitionGroupId = new
LLCSegmentName(immutableSegment.getSegmentName()).getPartitionGroupId();
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
int numPrimaryKeyColumns = _primaryKeyColumns.size();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 23efd0e..5486a43 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -38,15 +38,10 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
-import org.apache.pinot.spi.config.table.RoutingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -365,26 +360,6 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
}
/**
- * Creates a new Upsert enabled table config.
- */
- protected TableConfig createUpsertTableConfig(File sampleAvroFile, String
primaryKeyColumn, int numPartitions) {
- AvroFileSchemaKafkaAvroMessageDecoder.avroFile = sampleAvroFile;
- Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new
HashMap<>();
- columnPartitionConfigMap.put(primaryKeyColumn, new
ColumnPartitionConfig("Murmur", numPartitions));
-
- return new
TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
- .setTimeColumnName(getTimeColumnName())
-
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
-
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setLLC(useLlc())
-
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
- .setUpsertConfig(new UpsertConfig((UpsertConfig.Mode.FULL))).build();
- }
-
- /**
* Returns the REALTIME table config in the cluster.
*/
protected TableConfig getRealtimeTableConfig() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
index 87eafb6..cc01ae6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
@@ -182,9 +182,9 @@ public class BasicAuthRealtimeIntegrationTest extends
BaseClusterIntegrationTest
// download and sanity-check size of offline segment(s)
for (int i = 0; i < offlineSegments.size(); i++) {
String segment = offlineSegments.get(i).asText();
- Assert.assertTrue(sendGetRequest(_controllerRequestURLBuilder
-
.forSegmentDownload(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()),
segment), AUTH_HEADER)
- .length() > 200000); // download segment
+ Assert.assertTrue(
+
sendGetRequest(_controllerRequestURLBuilder.forSegmentDownload(getTableName(),
segment), AUTH_HEADER).length()
+ > 200000); // download segment
}
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
deleted file mode 100644
index 0633d52..0000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.apache.helix.model.IdealState;
-import org.apache.http.HttpStatus;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-public class UpsertTableSegmentUploadIntegrationTest extends
BaseClusterIntegrationTestSet {
- private static final int NUM_BROKERS = 1;
- private static final int NUM_SERVERS = 2;
- // Segment 1 contains records of pk value 100000
- private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
- // Segment 2 contains records of pk value 100001
- private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
- // Segment 3 contains records of pk value 100000
- private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
- private static final String PRIMARY_KEY_COL = "clientId";
- private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME";
-
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- // Start the Pinot cluster
- startZk();
- // Start a customized controller with more frequent realtime segment
validation
- startController();
- startBrokers(getNumBrokers());
- startServers(NUM_SERVERS);
-
- // Start Kafka
- startKafka();
-
- // Create and upload the schema.
- Schema schema = createSchema();
- addSchema(schema);
-
- // Unpack the Avro files
- List<File> avroFiles = unpackAvroData(_tempDir);
-
- // Push data to Kafka
- pushAvroIntoKafka(avroFiles);
- // Create and upload the table config
- TableConfig upsertTableConfig = createUpsertTableConfig(avroFiles.get(0),
PRIMARY_KEY_COL, getNumKafkaPartitions());
- addTableConfig(upsertTableConfig);
-
- // Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles,
upsertTableConfig, schema, 0, _segmentDir, _tarDir);
- uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
-
- // Wait for all documents loaded
- waitForAllDocsLoaded(600_000L);
- }
-
- @Override
- protected String getSchemaFileName() {
- return "upsert_table_test.schema";
- }
-
- @Override
- protected String getSchemaName() {
- return "upsertSchema";
- }
-
- @Override
- protected String getAvroTarFileName() {
- return "upsert_test.tar.gz";
- }
-
- @Override
- protected boolean useLlc() {
- return true;
- }
-
- protected int getNumBrokers() {
- return NUM_BROKERS;
- }
-
- @Override
- protected long getCountStarResult() {
- // Three distinct records are expected with pk values of 100000, 100001,
100002
- return 3;
- }
-
- @Override
- protected String getPartitionColumn() {
- return PRIMARY_KEY_COL;
- }
-
- @Override
- protected void startController() {
- Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
- // Perform realtime segment validation every second with 1 second initial
delay.
- controllerConfig
-
.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
1);
-
controllerConfig.put(ControllerConf.ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
1);
- controllerConfig
-
.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
1);
- startController(controllerConfig);
- }
-
- @Test
- public void testSegmentAssignment()
- throws Exception {
- IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
TABLE_NAME_WITH_TYPE);
- Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
- verifyTableIdealStates(idealState);
- // Wait 3 seconds to let the realtime validation thread to run.
- Thread.sleep(3000);
- // Verify the result again.
- Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
- verifyTableIdealStates(idealState);
- }
-
- private void verifyTableIdealStates(IdealState idealState) {
- // Verify various ideal state properties
- Set<String> segments = idealState.getPartitionSet();
- Assert.assertEquals(segments.size(), 5);
- Map<String, Integer> segment2PartitionId = new HashMap<>();
- segment2PartitionId.put(UPLOADED_SEGMENT_1, 0);
- segment2PartitionId.put(UPLOADED_SEGMENT_2, 1);
- segment2PartitionId.put(UPLOADED_SEGMENT_3, 1);
-
- // Verify that all segments of the same partition are mapped to the same
single server.
- Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
- for (String segment : segments) {
- Integer partitionId;
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
- partitionId = new LLCSegmentName(segment).getPartitionGroupId();
- } else {
- partitionId = segment2PartitionId.get(segment);
- }
- Assert.assertNotNull(partitionId);
- Set<String> instances = idealState.getInstanceSet(segment);
- Assert.assertEquals(1, instances.size());
- if (segmentAssignment.containsKey(partitionId)) {
- Assert.assertEquals(instances, segmentAssignment.get(partitionId));
- } else {
- segmentAssignment.put(partitionId, instances);
- }
- }
- }
-
- private void uploadSegments(String tableName, TableType tableType, File
tarDir)
- throws Exception {
- File[] segmentTarFiles = tarDir.listFiles();
- assertNotNull(segmentTarFiles);
- int numSegments = segmentTarFiles.length;
- assertTrue(numSegments > 0);
-
- URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
- try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
- if (numSegments == 1) {
- File segmentTarFile = segmentTarFiles[0];
- assertEquals(fileUploadDownloadClient
- .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, tableName, tableType)
- .getStatusCode(), HttpStatus.SC_OK);
- } else {
- // Upload all segments in parallel
- ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
- List<Future<Integer>> futures = new ArrayList<>(numSegments);
- for (File segmentTarFile : segmentTarFiles) {
- futures.add(executorService.submit(() -> {
- return fileUploadDownloadClient
- .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, tableName, tableType)
- .getStatusCode();
- }));
- }
- executorService.shutdown();
- for (Future<Integer> future : futures) {
- assertEquals((int) future.get(), HttpStatus.SC_OK);
- }
- }
- }
- }
-}
diff --git
a/pinot-integration-tests/src/test/resources/upsert_table_test.schema
b/pinot-integration-tests/src/test/resources/upsert_table_test.schema
deleted file mode 100644
index 3f656f7..0000000
--- a/pinot-integration-tests/src/test/resources/upsert_table_test.schema
+++ /dev/null
@@ -1,33 +0,0 @@
-{
- "dimensionFieldSpecs": [
- {
- "dataType": "INT",
- "singleValueField": true,
- "name": "clientId"
- },
- {
- "dataType": "STRING",
- "singleValueField": true,
- "name": "city"
- },
- {
- "dataType": "STRING",
- "singleValueField": true,
- "name": "description"
- },
- {
- "dataType": "INT",
- "singleValueField": true,
- "name": "salary"
- }
- ],
- "timeFieldSpec": {
- "incomingGranularitySpec": {
- "timeType": "DAYS",
- "dataType": "INT",
- "name": "DaysSinceEpoch"
- }
- },
- "primaryKeyColumns": ["clientId"],
- "schemaName": "upsertSchema"
-}
diff --git a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz
b/pinot-integration-tests/src/test/resources/upsert_test.tar.gz
deleted file mode 100644
index 0bd0dc8..0000000
Binary files a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz
and /dev/null differ
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 8e02f85..9289f9c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -118,8 +118,6 @@ public class PartitionUpsertMetadataManager {
// timestamp, but the segment has a larger sequence number (the
segment is newer than the current segment).
if (recordInfo._timestamp > currentRecordLocation.getTimestamp()
|| (
recordInfo._timestamp == currentRecordLocation.getTimestamp()
- &&
LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
- &&
LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
&& LLCSegmentName.getSequenceNumber(segmentName) >
LLCSegmentName
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 534a3a0..6a5d74a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -394,12 +394,7 @@ public class CommonConstants {
public static class Segment {
public static class Realtime {
public enum Status {
- // Means the segment is in CONSUMING state.
- IN_PROGRESS,
- // Means the segment is in ONLINE state (segment completed consuming
and has been saved in segment store).
- DONE,
- // Means the segment is uploaded to a Pinot controller by an external
party.
- UPLOADED
+ IN_PROGRESS, DONE
}
/**
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index 5afec65..5588a8c 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -329,9 +329,10 @@ public class PerfBenchmarkDriver {
*
* @param segmentMetadata segment metadata.
*/
- public void addSegment(String tableNameWithType, SegmentMetadata
segmentMetadata) {
+ public void addSegment(String tableName, SegmentMetadata segmentMetadata) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
_helixResourceManager
- .addNewSegment(tableNameWithType, segmentMetadata, "http://" +
_controllerAddress + "/" + segmentMetadata.getName());
+ .addNewSegment(rawTableName, segmentMetadata, "http://" +
_controllerAddress + "/" + segmentMetadata.getName());
}
public static void waitForExternalViewUpdate(String zkAddress, final String
clusterName, long timeoutInMilliseconds) {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
index 8f2796e..a785a92 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
@@ -62,7 +62,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand
implements Command
@Option(name = "-timeoutInSeconds", required = false, metaVar = "<int>",
usage = "Timeout in seconds for batch load (default 60).")
private int _timeoutInSeconds = 60;
- @Option(name = "-tableNames", required = false, metaVar = "<String>", usage
= "Comma separated table names with types to be loaded (non-batch load).")
+ @Option(name = "-tableNames", required = false, metaVar = "<String>", usage
= "Comma separated table names to be loaded (non-batch load).")
private String _tableNames;
@Option(name = "-invertedIndexColumns", required = false, metaVar =
"<String>", usage = "Comma separated inverted index columns to be created
(non-batch load).")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]