xiangfu0 commented on a change in pull request #7255:
URL: https://github.com/apache/pinot/pull/7255#discussion_r687269751
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1742,37 +1736,36 @@ public ZNRecord getSegmentMetadataZnRecord(String
tableNameWithType, String segm
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
segmentName));
}
- public boolean updateZkMetadata(String offlineTableName,
OfflineSegmentZKMetadata segmentMetadata,
- int expectedVersion) {
+ public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata
segmentZKMetadata, int expectedVersion) {
return ZKMetadataProvider
- .setOfflineSegmentZKMetadata(_propertyStore, offlineTableName,
segmentMetadata, expectedVersion);
+ .setSegmentZKMetadata(_propertyStore, tableNameWithType,
segmentZKMetadata, expectedVersion);
}
- public boolean updateZkMetadata(String offlineTableName,
OfflineSegmentZKMetadata segmentMetadata) {
- return ZKMetadataProvider.setOfflineSegmentZKMetadata(_propertyStore,
offlineTableName, segmentMetadata);
+ public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata
segmentZKMetadata) {
+ return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentZKMetadata);
}
- public void refreshSegment(String offlineTableName, SegmentMetadata
segmentMetadata,
- OfflineSegmentZKMetadata offlineSegmentZKMetadata, String downloadUrl,
@Nullable String crypter) {
+ public void refreshSegment(String tableNameWithType, SegmentMetadata
segmentMetadata,
+ SegmentZKMetadata segmentZKMetadata, String downloadUrl, @Nullable
String crypter) {
String segmentName = segmentMetadata.getName();
// NOTE: Must first set the segment ZK metadata before trying to refresh
because servers and brokers rely on segment
// ZK metadata to refresh the segment (server will compare the segment ZK
metadata with the local metadata to decide
// whether to download the new segment; broker will update the the segment
partition info & time boundary based on
// the segment ZK metadata)
ZKMetadataUtils
- .updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata,
CommonConstants.Segment.SegmentType.OFFLINE);
- offlineSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
- offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
- offlineSegmentZKMetadata.setCrypterName(crypter);
- if (!ZKMetadataProvider.setOfflineSegmentZKMetadata(_propertyStore,
offlineTableName, offlineSegmentZKMetadata)) {
+ .updateSegmentMetadata(segmentZKMetadata, segmentMetadata,
CommonConstants.Segment.SegmentType.OFFLINE);
Review comment:
do we still need segment type here?
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
##########
@@ -36,209 +34,302 @@
import org.slf4j.LoggerFactory;
-public abstract class SegmentZKMetadata implements ZKMetadata {
+public class SegmentZKMetadata implements ZKMetadata {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentZKMetadata.class);
+ private static final String NULL = "null";
- protected static final String NULL = "null";
-
- private String _segmentName;
- private SegmentType _segmentType;
- private long _startTime = -1;
- private long _endTime = -1;
- private TimeUnit _timeUnit;
- private String _indexVersion;
- private long _totalDocs = -1;
- private long _crc = -1;
- private long _creationTime = -1;
- private SegmentPartitionMetadata _partitionMetadata;
- private long _segmentUploadStartTime = -1;
- private String _crypterName;
- private Map<String, String> _customMap;
+ private final ZNRecord _znRecord;
+ private Map<String, String> _simpleFields;
- @Deprecated
- private String _rawTableName;
+ // Cache start/end time because they can be used to sort the metadata
+ private long _startTimeMs = -1;
+ private long _endTimeMs = -1;
- public SegmentZKMetadata() {
+ public SegmentZKMetadata(String segmentName) {
+ _znRecord = new ZNRecord(segmentName);
+ _simpleFields = _znRecord.getSimpleFields();
+ // TODO: Remove this field after releasing 0.9.0
+ _simpleFields.put(Segment.SEGMENT_NAME, segmentName);
}
public SegmentZKMetadata(ZNRecord znRecord) {
- _segmentName = znRecord.getSimpleField(Segment.SEGMENT_NAME);
- _segmentType = znRecord.getEnumField(Segment.SEGMENT_TYPE,
SegmentType.class, SegmentType.OFFLINE);
- _startTime = znRecord.getLongField(Segment.START_TIME, -1);
- _endTime = znRecord.getLongField(Segment.END_TIME, -1);
- String timeUnitString = znRecord.getSimpleField(Segment.TIME_UNIT);
- if (timeUnitString != null && !timeUnitString.equals(NULL)) {
- _timeUnit = znRecord.getEnumField(Segment.TIME_UNIT, TimeUnit.class,
TimeUnit.DAYS);
- }
- _indexVersion = znRecord.getSimpleField(Segment.INDEX_VERSION);
- _totalDocs = znRecord.getLongField(Segment.TOTAL_DOCS, -1);
- _crc = znRecord.getLongField(Segment.CRC, -1);
- _creationTime = znRecord.getLongField(Segment.CREATION_TIME, -1);
- try {
- String partitionMetadataJson =
znRecord.getSimpleField(Segment.PARTITION_METADATA);
- if (partitionMetadataJson != null) {
- _partitionMetadata =
SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
- }
- } catch (IOException e) {
- LOGGER.error(
- "Exception caught while reading partition info from zk metadata for
segment '{}', partition info dropped.",
- _segmentName, e);
- }
- _segmentUploadStartTime =
znRecord.getLongField(Segment.SEGMENT_UPLOAD_START_TIME, -1);
- _crypterName = znRecord.getSimpleField(Segment.CRYPTER_NAME);
- _customMap = znRecord.getMapField(Segment.CUSTOM_MAP);
-
- // For backward-compatibility
- setTableName(znRecord.getSimpleField(Segment.TABLE_NAME));
+ _znRecord = znRecord;
+ _simpleFields = znRecord.getSimpleFields();
}
public String getSegmentName() {
- return _segmentName;
- }
-
- public void setSegmentName(String segmentName) {
- _segmentName = segmentName;
- }
-
- public SegmentType getSegmentType() {
- return _segmentType;
- }
-
- public void setSegmentType(SegmentType segmentType) {
- _segmentType = segmentType;
+ return _znRecord.getId();
}
public long getStartTimeMs() {
- if (_startTime > 0 && _timeUnit != null) {
- return _timeUnit.toMillis(_startTime);
- } else {
- return -1;
+ if (_startTimeMs < 0) {
Review comment:
use <= ?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
##########
@@ -377,10 +377,9 @@ public void run() {
try {
_segmentLogger.info("Marking current segment as completed in
Helix");
- RealtimeSegmentZKMetadata metadataToOverwrite = new
RealtimeSegmentZKMetadata();
+ SegmentZKMetadata metadataToOverwrite = new
SegmentZKMetadata(segmentZKMetadata.getSegmentName());
metadataToOverwrite.setTableName(_tableNameWithType);
-
metadataToOverwrite.setSegmentName(realtimeSegmentZKMetadata.getSegmentName());
- metadataToOverwrite.setSegmentType(SegmentType.OFFLINE);
Review comment:
I think we used to claim only consuming segment as real-time segment and
all persisted(completed) segments are in `OFFLINE` type.
If we don't have any logic rely on this, then it should be fine. E.g.
segment retention.
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
##########
@@ -36,209 +34,302 @@
import org.slf4j.LoggerFactory;
-public abstract class SegmentZKMetadata implements ZKMetadata {
+public class SegmentZKMetadata implements ZKMetadata {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentZKMetadata.class);
+ private static final String NULL = "null";
- protected static final String NULL = "null";
-
- private String _segmentName;
- private SegmentType _segmentType;
- private long _startTime = -1;
- private long _endTime = -1;
- private TimeUnit _timeUnit;
- private String _indexVersion;
- private long _totalDocs = -1;
- private long _crc = -1;
- private long _creationTime = -1;
- private SegmentPartitionMetadata _partitionMetadata;
- private long _segmentUploadStartTime = -1;
- private String _crypterName;
- private Map<String, String> _customMap;
+ private final ZNRecord _znRecord;
+ private Map<String, String> _simpleFields;
- @Deprecated
- private String _rawTableName;
+ // Cache start/end time because they can be used to sort the metadata
+ private long _startTimeMs = -1;
+ private long _endTimeMs = -1;
- public SegmentZKMetadata() {
+ public SegmentZKMetadata(String segmentName) {
+ _znRecord = new ZNRecord(segmentName);
+ _simpleFields = _znRecord.getSimpleFields();
+ // TODO: Remove this field after releasing 0.9.0
+ _simpleFields.put(Segment.SEGMENT_NAME, segmentName);
}
public SegmentZKMetadata(ZNRecord znRecord) {
- _segmentName = znRecord.getSimpleField(Segment.SEGMENT_NAME);
- _segmentType = znRecord.getEnumField(Segment.SEGMENT_TYPE,
SegmentType.class, SegmentType.OFFLINE);
- _startTime = znRecord.getLongField(Segment.START_TIME, -1);
- _endTime = znRecord.getLongField(Segment.END_TIME, -1);
- String timeUnitString = znRecord.getSimpleField(Segment.TIME_UNIT);
- if (timeUnitString != null && !timeUnitString.equals(NULL)) {
- _timeUnit = znRecord.getEnumField(Segment.TIME_UNIT, TimeUnit.class,
TimeUnit.DAYS);
- }
- _indexVersion = znRecord.getSimpleField(Segment.INDEX_VERSION);
- _totalDocs = znRecord.getLongField(Segment.TOTAL_DOCS, -1);
- _crc = znRecord.getLongField(Segment.CRC, -1);
- _creationTime = znRecord.getLongField(Segment.CREATION_TIME, -1);
- try {
- String partitionMetadataJson =
znRecord.getSimpleField(Segment.PARTITION_METADATA);
- if (partitionMetadataJson != null) {
- _partitionMetadata =
SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
- }
- } catch (IOException e) {
- LOGGER.error(
- "Exception caught while reading partition info from zk metadata for
segment '{}', partition info dropped.",
- _segmentName, e);
- }
- _segmentUploadStartTime =
znRecord.getLongField(Segment.SEGMENT_UPLOAD_START_TIME, -1);
- _crypterName = znRecord.getSimpleField(Segment.CRYPTER_NAME);
- _customMap = znRecord.getMapField(Segment.CUSTOM_MAP);
-
- // For backward-compatibility
- setTableName(znRecord.getSimpleField(Segment.TABLE_NAME));
+ _znRecord = znRecord;
+ _simpleFields = znRecord.getSimpleFields();
}
public String getSegmentName() {
- return _segmentName;
- }
-
- public void setSegmentName(String segmentName) {
- _segmentName = segmentName;
- }
-
- public SegmentType getSegmentType() {
- return _segmentType;
- }
-
- public void setSegmentType(SegmentType segmentType) {
- _segmentType = segmentType;
+ return _znRecord.getId();
}
public long getStartTimeMs() {
- if (_startTime > 0 && _timeUnit != null) {
- return _timeUnit.toMillis(_startTime);
- } else {
- return -1;
+ if (_startTimeMs < 0) {
+ String startTimeString = _simpleFields.get(Segment.START_TIME);
+ if (startTimeString != null) {
+ _startTimeMs =
TimeUnit.valueOf(_simpleFields.get(Segment.TIME_UNIT)).toMillis(Long.parseLong(startTimeString));
+ }
}
- }
-
- public void setStartTime(long startTime) {
- _startTime = startTime;
+ return _startTimeMs;
}
public long getEndTimeMs() {
- if (_endTime > 0 && _timeUnit != null) {
- return _timeUnit.toMillis(_endTime);
- } else {
- return -1;
+ if (_endTimeMs < 0) {
Review comment:
use <= ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]