This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d2619c1613 Add a new API to fix segment date time in metadata (#9413)
d2619c1613 is described below
commit d2619c1613e143ba4ab81aecb2f2fef3f5dd6539
Author: Kartik Khare <[email protected]>
AuthorDate: Thu Sep 29 17:25:53 2022 +0530
Add a new API to fix segment date time in metadata (#9413)
* Add API to handle change in timestamp format
* Fix linting
* Add support for storing raw segment start and end time in metadata
* Do not store start end time in segment metadata seperately
* remove refresh flag
* Store new start/end time with proper time unit
* Move the API to segment resource
* Fix test failure
* Add test
* Cleanup: Remove duplicate methods
* Store start/end time in milliseconds in zookeeper
* Refactor: change method names and reduce scope of exceptions
* Remove redundant timeunit conversion
* Throw user errors and check for time column
* Fix Segment tests
Co-authored-by: Kartik Khare <[email protected]>
---
.../common/metadata/segment/SegmentZKMetadata.java | 16 +++++
.../api/resources/PinotSegmentRestletResource.java | 68 ++++++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 17 ++++++
.../helix/core/util/ZKMetadataUtils.java | 35 ++++++++++-
.../helix/core/PinotHelixResourceManagerTest.java | 46 +++++++++++++++
.../controller/utils/SegmentMetadataMockUtils.java | 10 +++-
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
7 files changed, 189 insertions(+), 5 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index ff9f5530ba..a891ed1829 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -94,6 +94,14 @@ public class SegmentZKMetadata implements ZKMetadata {
return _endTimeMs;
}
+ public String getRawStartTime() {
+ return _simpleFields.get(Segment.RAW_START_TIME);
+ }
+
+ public String getRawEndTime() {
+ return _simpleFields.get(Segment.RAW_END_TIME);
+ }
+
public void setStartTime(long startTime) {
setNonNegativeValue(Segment.START_TIME, startTime);
_startTimeMsCached = false;
@@ -104,6 +112,14 @@ public class SegmentZKMetadata implements ZKMetadata {
_endTimeMsCached = false;
}
+ public void setRawStartTime(String startTime) {
+ setValue(Segment.RAW_START_TIME, startTime);
+ }
+
+ public void setRawEndTime(String endTime) {
+ setValue(Segment.RAW_END_TIME, endTime);
+ }
+
public void setTimeUnit(TimeUnit timeUnit) {
setValue(Segment.TIME_UNIT, timeUnit);
_startTimeMsCached = false;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 9fa65836da..5e94c29ac5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -59,6 +59,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -79,7 +80,10 @@ import
org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.controller.util.TableTierReader;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -1035,4 +1039,68 @@ public class PinotSegmentRestletResource {
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/segments/{tableNameWithType}/updateZKTimeInterval")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update the start and end time of the segments based
on latest schema",
+ notes = "Update the start and end time of the segments based on latest
schema")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse updateTimeIntervalZK(
+ @ApiParam(value = "Table name with type", required = true,
+ example = "myTable_REALTIME") @PathParam("tableNameWithType") String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table type not provided with table name %s",
tableNameWithType),
+ Status.BAD_REQUEST);
+ }
+ return updateZKTimeIntervalInternal(tableNameWithType);
+ }
+
+ /**
+ * Internal method to update schema
+ * @param tableNameWithType name of the table
+ * @return
+ */
+ private SuccessResponse updateZKTimeIntervalInternal(String
tableNameWithType) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to find table config for table: " + tableNameWithType,
Status.NOT_FOUND);
+ }
+
+ Schema tableSchema =
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
+ if (tableSchema == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to find schema for table: " + tableNameWithType,
Status.NOT_FOUND);
+ }
+
+ String timeColumn =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (StringUtils.isEmpty(timeColumn)) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to find time column for table : " + tableNameWithType,
Status.NOT_FOUND);
+ }
+
+ DateTimeFieldSpec timeColumnFieldSpec =
tableSchema.getSpecForTimeColumn(timeColumn);
+ if (timeColumnFieldSpec == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find field spec for column: %s and table:
%s", timeColumn, tableNameWithType),
+ Status.NOT_FOUND);
+ }
+
+ try {
+
_pinotHelixResourceManager.updateSegmentsZKTimeInterval(tableNameWithType,
timeColumnFieldSpec);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to update time interval zk metadata for
table %s", tableNameWithType),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ return new SuccessResponse("Successfully updated time interval zk
metadata for table: " + tableNameWithType);
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index af25e423b2..5ca73f145e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -152,6 +152,7 @@ import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.user.ComponentType;
import org.apache.pinot.spi.config.user.RoleType;
import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -1284,6 +1285,17 @@ public class PinotHelixResourceManager {
}
}
+ public void updateSegmentsZKTimeInterval(String tableNameWithType,
DateTimeFieldSpec timeColumnFieldSpec) {
+ LOGGER.info("Updating segment time interval in ZK metadata for table: {}",
tableNameWithType);
+
+ List<SegmentZKMetadata> segmentZKMetadataList =
getSegmentsZKMetadata(tableNameWithType);
+ for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+ int version = segmentZKMetadata.toZNRecord().getVersion();
+ updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
+ updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
+ }
+ }
+
public void updateSchema(Schema schema, boolean reload)
throws SchemaNotFoundException, SchemaBackwardIncompatibleException,
TableNotFoundException {
String schemaName = schema.getSchemaName();
@@ -2265,6 +2277,11 @@ public class PinotHelixResourceManager {
}
}
+ public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata,
+ DateTimeFieldSpec timeColumnFieldSpec) {
+ ZKMetadataUtils.updateSegmentZKTimeInterval(segmentZKMetadata,
timeColumnFieldSpec);
+ }
+
@VisibleForTesting
public void refreshSegment(String tableNameWithType, SegmentMetadata
segmentMetadata,
SegmentZKMetadata segmentZKMetadata, int expectedVersion, String
downloadUrl) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 0afcb3aa59..306d93d0ae 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -21,13 +21,17 @@ package org.apache.pinot.controller.helix.core.util;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SegmentName;
+import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -56,6 +60,21 @@ public class ZKMetadataUtils {
segmentSizeInBytes, false);
}
+ public static void updateSegmentZKTimeInterval(SegmentZKMetadata
segmentZKMetadata,
+ DateTimeFieldSpec dateTimeFieldSpec) {
+ String startTimeString = segmentZKMetadata.getRawStartTime();
+ if (StringUtils.isNotEmpty(startTimeString)) {
+ long updatedStartTime =
dateTimeFieldSpec.getFormatSpec().fromFormatToMillis(startTimeString);
+ segmentZKMetadata.setStartTime(updatedStartTime);
+ }
+
+ String endTimeString = segmentZKMetadata.getRawEndTime();
+ if (StringUtils.isNotEmpty(endTimeString)) {
+ long updatedEndTime =
dateTimeFieldSpec.getFormatSpec().fromFormatToMillis(endTimeString);
+ segmentZKMetadata.setEndTime(updatedEndTime);
+ }
+ }
+
private static void updateSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypterName, long segmentSizeInBytes,
boolean newSegment) {
@@ -66,9 +85,14 @@ public class ZKMetadataUtils {
}
if (segmentMetadata.getTimeInterval() != null) {
- segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
- segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
- segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+
segmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
+
segmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
+ segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
+ ColumnMetadata timeColumnMetadata =
segmentMetadata.getColumnMetadataFor(segmentMetadata.getTimeColumn());
+ if (isValidTimeMetadata(timeColumnMetadata)) {
+
segmentZKMetadata.setRawStartTime(timeColumnMetadata.getMinValue().toString());
+
segmentZKMetadata.setRawEndTime(timeColumnMetadata.getMaxValue().toString());
+ }
} else {
segmentZKMetadata.setStartTime(-1);
segmentZKMetadata.setEndTime(-1);
@@ -128,4 +152,9 @@ public class ZKMetadataUtils {
}
}
}
+
+ private static boolean isValidTimeMetadata(ColumnMetadata
timeColumnMetadata) {
+ return timeColumnMetadata != null && timeColumnMetadata.getMinValue() !=
null
+ && timeColumnMetadata.getMaxValue() != null &&
!timeColumnMetadata.isMinMaxValueInvalid();
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index e8e8bcd6a1..2e1ea547ac 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
@@ -66,10 +67,15 @@ import
org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -102,6 +108,10 @@ public class PinotHelixResourceManagerTest {
private static final String OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME
=
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ private static final String SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME =
"segmentsMetadataUpdateTestTable";
+ private static final String OFFLINE_SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME
=
+
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME);
+
private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
private static final long TIMEOUT_IN_MS = 60_000L;
@@ -308,6 +318,42 @@ public class PinotHelixResourceManagerTest {
}
}
+ @Test
+ public void testUpdateSchemaDateTime()
+ throws Exception {
+ String segmentName = "testSegmentDateTime";
+ SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
+
+ long curr = System.currentTimeMillis();
+ DateTimeFormatter dateTimeFormat =
+ new
DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSS").toFormatter();
+
segmentZKMetadata.setRawStartTime(dateTimeFormat.withZone(DateTimeZone.UTC).print(curr));
+
segmentZKMetadata.setRawEndTime(dateTimeFormat.withZone(DateTimeZone.UTC).print(curr));
+ segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
+ ZKMetadataProvider.setSegmentZKMetadata(TEST_INSTANCE.getPropertyStore(),
+ OFFLINE_SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME, segmentZKMetadata);
+ List<SegmentZKMetadata> retrievedSegmentZKMetadataList =
+
TEST_INSTANCE.getHelixResourceManager().getSegmentsZKMetadata(OFFLINE_SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME);
+ SegmentZKMetadata retrievedSegmentZKMetadata =
retrievedSegmentZKMetadataList.get(0);
+ Assert.assertEquals(retrievedSegmentZKMetadata.getSegmentName(),
segmentName);
+ Assert.assertEquals(retrievedSegmentZKMetadataList.size(), 1);
+ Assert.assertEquals(retrievedSegmentZKMetadata.getStartTimeMs(), -1);
+ Assert.assertEquals(retrievedSegmentZKMetadata.getEndTimeMs(), -1);
+
+ DateTimeFieldSpec timeColumnFieldSpec = new DateTimeFieldSpec("timestamp",
+ FieldSpec.DataType.STRING,
"SIMPLE_DATE_FORMAT|yyyy-MM-dd'T'HH:mm:ss.SSS", "1:MILLISECONDS");
+
+ TEST_INSTANCE.getHelixResourceManager()
+
.updateSegmentsZKTimeInterval(OFFLINE_SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME,
timeColumnFieldSpec);
+ retrievedSegmentZKMetadataList =
+
TEST_INSTANCE.getHelixResourceManager().getSegmentsZKMetadata(OFFLINE_SEGMENTS_METADATA_UPDATE_TEST_TABLE_NAME);
+ retrievedSegmentZKMetadata = retrievedSegmentZKMetadataList.get(0);
+ Assert.assertEquals(retrievedSegmentZKMetadata.getSegmentName(),
segmentName);
+ Assert.assertEquals(retrievedSegmentZKMetadataList.size(), 1);
+ Assert.assertEquals(retrievedSegmentZKMetadata.getStartTimeMs(), curr);
+ Assert.assertEquals(retrievedSegmentZKMetadata.getEndTimeMs(), curr);
+ }
+
@Test
void testRetrieveTenantNames() {
// Create broker tenant on 1 Broker
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 897ce9fcea..6c769d620a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -46,8 +46,11 @@ public class SegmentMetadataMockUtils {
Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(numTotalDocs);
Mockito.when(segmentMetadata.getCrc()).thenReturn(crc);
+ Mockito.when(segmentMetadata.getStartTime()).thenReturn(1L);
Mockito.when(segmentMetadata.getEndTime()).thenReturn(10L);
- Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(new Interval(0,
20));
+ Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(
+ new Interval(TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS),
+ TimeUnit.MILLISECONDS.convert(10, TimeUnit.DAYS)));
Mockito.when(segmentMetadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
return segmentMetadata;
}
@@ -96,8 +99,11 @@ public class SegmentMetadataMockUtils {
Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(10);
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
+ Mockito.when(segmentMetadata.getStartTime()).thenReturn(endTime - 10);
Mockito.when(segmentMetadata.getEndTime()).thenReturn(endTime);
- Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(new
Interval(endTime - 10, endTime + 10));
+ Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(
+ new Interval(TimeUnit.MILLISECONDS.convert(endTime - 10,
TimeUnit.DAYS),
+ TimeUnit.MILLISECONDS.convert(endTime, TimeUnit.DAYS)));
Mockito.when(segmentMetadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
return segmentMetadata;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 44a5b37cd8..25973c3b34 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -738,6 +738,8 @@ public class CommonConstants {
public static final String START_TIME = "segment.start.time";
public static final String END_TIME = "segment.end.time";
+ public static final String RAW_START_TIME = "segment.start.time.raw";
+ public static final String RAW_END_TIME = "segment.end.time.raw";
public static final String TIME_UNIT = "segment.time.unit";
public static final String INDEX_VERSION = "segment.index.version";
public static final String TOTAL_DOCS = "segment.total.docs";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]