This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4000568 Fix race conditions between segment merge/roll-up and purge
(or convertToRawIndex) tasks: (#7427)
4000568 is described below
commit 400056804fc1c0da73c5ec78ca3fe1a1bcab9231
Author: Jiapeng Tao <[email protected]>
AuthorDate: Thu Sep 23 17:22:32 2021 -0700
Fix race conditions between segment merge/roll-up and purge (or
convertToRawIndex) tasks: (#7427)
1. Add REFRESH_ONLY header for purge and convertToRawIndex tasks, segment
upload api will abort the request if the segment does not exist or is deleted
before the upload request is completed.
2. Honor segment lineage for convertToRawIndexTaskGenerator
---
.../SegmentLineageBasedSegmentPreSelector.java | 2 +-
.../broker/broker/HelixBrokerStarterTest.java | 3 +-
.../pinot/common/lineage/SegmentLineageUtils.java | 2 +-
.../common/utils/FileUploadDownloadClient.java | 1 +
.../PinotSegmentUploadDownloadRestletResource.java | 22 +++++++
.../pinot/controller/api/upload/ZKOperator.java | 24 ++++++--
.../helix/core/PinotHelixResourceManager.java | 5 +-
.../validation/ValidationManagerTest.java | 3 +-
.../tests/OfflineClusterIntegrationTest.java | 71 ++++++++++++++++++++++
.../tasks/BaseSingleSegmentConversionExecutor.java | 4 ++
.../ConvertToRawIndexTaskGenerator.java | 19 +++++-
.../mergerollup/MergeRollupTaskGenerator.java | 2 +-
12 files changed, 146 insertions(+), 12 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
index cd41c6e..b9b4092 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
@@ -44,7 +44,7 @@ public class SegmentLineageBasedSegmentPreSelector implements
SegmentPreSelector
@Override
public Set<String> preSelect(Set<String> onlineSegments) {
SegmentLineage segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
_tableNameWithType);
- SegmentLineageUtils.filterSegmentsBasedOnLineageInplace(onlineSegments,
segmentLineage);
+ SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(onlineSegments,
segmentLineage);
return onlineSegments;
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 9d667f2..6464e46 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -65,6 +65,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
private static final int NUM_BROKERS = 3;
private static final int NUM_SERVERS = 1;
private static final int NUM_OFFLINE_SEGMENTS = 5;
+ private static final int EXPECTED_VERSION = -1;
private HelixBrokerStarter _brokerStarter;
@@ -215,7 +216,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
_helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME,
segmentToRefresh);
_helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME,
segmentToRefresh, newEndTime),
- segmentZKMetadata, "downloadUrl", null);
+ segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
TestUtils.waitForCondition(aVoid ->
routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
.equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update
the time boundary for refreshed segment");
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
index 0a99509..458b247 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
@@ -43,7 +43,7 @@ public class SegmentLineageUtils {
* Use the segment lineage metadata to filters out either merged segments or
original segments in place
* to make sure that the final segments contain no duplicate data.
*/
- public static void filterSegmentsBasedOnLineageInplace(Set<String> segments,
SegmentLineage segmentLineage) {
+ public static void filterSegmentsBasedOnLineageInPlace(Set<String> segments,
SegmentLineage segmentLineage) {
if (segmentLineage != null) {
for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
LineageEntry lineageEntry =
segmentLineage.getLineageEntry(lineageEntryId);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 0e3ce23..bcd71ae 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -85,6 +85,7 @@ public class FileUploadDownloadClient implements Closeable {
public static class CustomHeaders {
public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
+ public static final String REFRESH_ONLY = "REFRESH_ONLY";
public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER =
"Pinot-SegmentZKMetadataCustomMapModifier";
public static final String CRYPTER = "CRYPTER";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 9066ff5..f2c6c45 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -23,6 +23,8 @@ import com.google.common.base.Strings;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -414,6 +416,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@Path("/segments")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+ @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
// We use this endpoint with URI upload because a request sent with the
multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint does not move
the segment to its final location;
// it keeps it at the downloadURI header that is set. We will not support
this endpoint going forward.
@@ -442,6 +449,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@Path("/segments")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as
binary")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+ @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
// For the multipart endpoint, we will always move segment to final location
regardless of the segment endpoint.
public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
@@ -468,6 +480,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@Path("/v2/segments")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+ @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
// We use this endpoint with URI upload because a request sent with the
multipart content type will reject the POST
// request if a multipart object is not sent. This endpoint is recommended
for use. It differs from the first
// endpoint in how it moves the segment to a Pinot-determined final
directory.
@@ -496,6 +513,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@Path("/v2/segments")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as
binary")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+ @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
// This behavior does not differ from v1 of the same endpoint.
public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 9d6bf01..b3a7258 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -65,7 +65,14 @@ public class ZKOperator {
String segmentName = segmentMetadata.getName();
ZNRecord segmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
+ boolean refreshOnly =
+
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
if (segmentMetadataZNRecord == null) {
+ if (refreshOnly) {
+ throw new ControllerApplicationException(LOGGER,
+ "Cannot refresh non-existing segment, aborted uploading segment: "
+ segmentName + " of table: "
+ + tableNameWithType, Response.Status.GONE);
+ }
LOGGER.info("Adding new segment {} from table {}", segmentName,
tableNameWithType);
processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, headers,
crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation);
@@ -93,6 +100,7 @@ public class ZKOperator {
SegmentZKMetadata existingSegmentZKMetadata = new
SegmentZKMetadata(znRecord);
long existingCrc = existingSegmentZKMetadata.getCrc();
+ int expectedVersion = znRecord.getVersion();
// Check if CRC match when IF-MATCH header is set
checkCRC(headers, tableNameWithType, segmentName, existingCrc);
@@ -118,10 +126,13 @@ public class ZKOperator {
// Lock the segment by setting the upload start time in ZK
existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
if (!_pinotHelixResourceManager
- .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
znRecord.getVersion())) {
+ .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
expectedVersion)) {
throw new ControllerApplicationException(LOGGER,
"Failed to lock the segment: " + segmentName + " of table: " +
tableNameWithType + ", retry later",
Response.Status.CONFLICT);
+ } else {
+ // The version will increment if the zk metadata update is successful
+ expectedVersion++;
}
}
@@ -156,7 +167,10 @@ public class ZKOperator {
// (creation time is not included in the crc)
existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
- if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
existingSegmentZKMetadata)) {
+ // NOTE: in rare cases the segment can be deleted before the metadata
is updated and the expected version won't
+ // match, we should fail the request for such cases
+ if (!_pinotHelixResourceManager
+ .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
expectedVersion)) {
throw new RuntimeException(
"Failed to update ZK metadata for segment: " + segmentName + "
of table: " + tableNameWithType);
}
@@ -175,10 +189,12 @@ public class ZKOperator {
}
_pinotHelixResourceManager
- .refreshSegment(tableNameWithType, segmentMetadata,
existingSegmentZKMetadata, zkDownloadURI, crypter);
+ .refreshSegment(tableNameWithType, segmentMetadata,
existingSegmentZKMetadata, expectedVersion,
+ zkDownloadURI, crypter);
}
} catch (Exception e) {
- if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
existingSegmentZKMetadata)) {
+ if (!_pinotHelixResourceManager
+ .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
expectedVersion)) {
LOGGER.error("Failed to update ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
}
throw e;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cbc2651..25797c3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1788,7 +1788,7 @@ public class PinotHelixResourceManager {
}
public void refreshSegment(String tableNameWithType, SegmentMetadata
segmentMetadata,
- SegmentZKMetadata segmentZKMetadata, String downloadUrl, @Nullable
String crypter) {
+ SegmentZKMetadata segmentZKMetadata, int expectedVersion, String
downloadUrl, @Nullable String crypter) {
String segmentName = segmentMetadata.getName();
// NOTE: Must first set the segment ZK metadata before trying to refresh
because servers and brokers rely on segment
@@ -1800,7 +1800,8 @@ public class PinotHelixResourceManager {
segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
segmentZKMetadata.setDownloadUrl(downloadUrl);
segmentZKMetadata.setCrypterName(crypter);
- if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentZKMetadata)) {
+ if (!ZKMetadataProvider
+ .setSegmentZKMetadata(_propertyStore, tableNameWithType,
segmentZKMetadata, expectedVersion)) {
throw new RuntimeException(
"Failed to update ZK metadata for segment: " + segmentName + " of
table: " + tableNameWithType);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index fee669e..335d8f4 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -51,6 +51,7 @@ public class ValidationManagerTest {
private static final String TEST_TABLE_NAME = "validationTable";
private static final String OFFLINE_TEST_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
private static final String TEST_SEGMENT_NAME = "testSegment";
+ private static final int EXPECTED_VERSION = -1;
private TableConfig _offlineTableConfig;
@@ -88,7 +89,7 @@ public class ValidationManagerTest {
}, 30_000L, "Failed to find the segment in the ExternalView");
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
ControllerTestUtils.getHelixResourceManager()
- .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata,
"downloadUrl", null);
+ .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata,
EXPECTED_VERSION, "downloadUrl", null);
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 10966e7..7f3652a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
@@ -37,13 +38,21 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
+import org.apache.http.Header;
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -78,6 +87,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
private static final int NUM_SERVERS = 1;
private static final int NUM_SEGMENTS = 12;
private static final long ONE_HOUR_IN_MS = TimeUnit.HOURS.toMillis(1);
+ private static final String SEGMENT_UPLOAD_TEST_TABLE =
"segmentUploadTestTable";
// For table config refresh test, make an expensive query to ensure the
query won't finish in 5ms
private static final String TEST_TIMEOUT_QUERY =
@@ -324,6 +334,67 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
}
+ @Test
+ public void testUploadSegmentRefreshOnly()
+ throws Exception {
+ TableConfig segmentUploadTestTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(SEGMENT_UPLOAD_TEST_TABLE).setSchemaName(getSchemaName())
+
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+ .setSegmentVersion(getSegmentVersion())
+
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+ .setNullHandlingEnabled(getNullHandlingEnabled()).build();
+ addTableConfig(segmentUploadTestTableConfig);
+ String offlineTableName = segmentUploadTestTableConfig.getTableName();
+ File[] segmentTarFiles = _tarDir.listFiles();
+ assertNotNull(segmentTarFiles);
+ int numSegments = segmentTarFiles.length;
+ assertTrue(numSegments > 0);
+ List<Header> headers = new ArrayList<>();
+ headers.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true"));
+ List<NameValuePair> parameters = new ArrayList<>();
+ NameValuePair tableNameParameter = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+ TableNameBuilder.extractRawTableName(offlineTableName));
+ parameters.add(tableNameParameter);
+
+ URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+ try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
+ // Refresh non-existing segment
+ File segmentTarFile = segmentTarFiles[0];
+ try {
+ fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, headers, parameters,
+ FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ fail();
+ } catch (HttpErrorStatusException e) {
+ assertEquals(e.getStatusCode(), HttpStatus.SC_GONE);
+
assertTrue(_helixResourceManager.getSegmentsZKMetadata(SEGMENT_UPLOAD_TEST_TABLE).isEmpty());
+ }
+
+ // Upload segment
+ SimpleHttpResponse response = fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, null, parameters,
+ FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
+ System.out.println(response.getResponse());
+ List<SegmentZKMetadata> segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(offlineTableName);
+ assertEquals(segmentsZKMetadata.size(), 1);
+
+ // Refresh existing segment
+ response = fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, headers, parameters,
+ FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
+ segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(offlineTableName);
+ assertEquals(segmentsZKMetadata.size(), 1);
+ assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(),
Long.MIN_VALUE);
+ }
+ dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
+ }
+
@Test(dependsOnMethods = "testRangeIndexTriggering")
public void testInvertedIndexTriggering()
throws Exception {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index db329bd..646cc40 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -133,6 +133,9 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
// the newer segment won't get override
Header ifMatchHeader = new BasicHeader(HttpHeaders.IF_MATCH,
originalSegmentCrc);
+ // Only upload segment if it exists
+ Header refreshOnlyHeader = new
BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true");
+
// Set segment ZK metadata custom map modifier into HTTP header to
modify the segment ZK metadata
// NOTE: even segment is not changed, still need to upload the segment
to update the segment ZK metadata so that
// segment will not be submitted again
@@ -144,6 +147,7 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
List<Header> httpHeaders = new ArrayList<>();
httpHeaders.add(ifMatchHeader);
+ httpHeaders.add(refreshOnlyHeader);
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
index d0d9d12..5fc7c64 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
@@ -21,10 +21,13 @@ package
org.apache.pinot.plugin.minion.tasks.converttorawindex;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.data.Segment;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
@@ -94,8 +97,16 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
String columnsToConvertConfig =
taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY);
// Generate tasks
+ List<SegmentZKMetadata> offlineSegmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
+ SegmentLineage segmentLineage =
_clusterInfoAccessor.getSegmentLineage(offlineTableName);
+ Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
+ for (SegmentZKMetadata offlineSegmentZKMetadata :
offlineSegmentsZKMetadata) {
+
preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName());
+ }
+
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
segmentLineage);
+
int tableNumTasks = 0;
- for (SegmentZKMetadata segmentZKMetadata :
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName)) {
+ for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) {
// Generate up to tableMaxNumTasks tasks each time for each table
if (tableNumTasks == tableMaxNumTasks) {
break;
@@ -107,6 +118,12 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
continue;
}
+ // Skip segments based on lineage: for COMPLETED lineage, segments in
`segmentsFrom` will be removed by
+ // retention manager, for IN_PROGRESS lineage, segments in
`segmentsTo` are uploaded yet
+ if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) {
+ continue;
+ }
+
// Only submit segments that have not been converted
Map<String, String> customMap = segmentZKMetadata.getCustomMap();
if (customMap == null || !customMap.containsKey(
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index a1f559e..252505b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -127,7 +127,7 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
for (SegmentZKMetadata segment : allSegments) {
preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
}
-
SegmentLineageUtils.filterSegmentsBasedOnLineageInplace(preSelectedSegmentsBasedOnLineage,
segmentLineage);
+
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
segmentLineage);
List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
for (SegmentZKMetadata segment : allSegments) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]