This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 612e327b51fb refactor: Add Lombok annotations to hudi-common module
(part 7) (#18944)
612e327b51fb is described below
commit 612e327b51fbb4fc792dcaeecf47aa86f5ddab89
Author: voonhous <[email protected]>
AuthorDate: Fri Jun 12 09:53:08 2026 +0800
refactor: Add Lombok annotations to hudi-common module (part 7) (#18944)
* refactor: Add Lombok annotations to hudi-common module (part 7)
* refactor: Address review comments
- HoodieLogBlock: revert lombok @NonNull to javax.annotation.Nonnull; null
header/footer is expected on the read path (v2/v3 blocks have no footer),
so runtime null checks would NPE on every log block read
- IncrementalQueryAnalyzer: make QueryContext constructor private again via
@AllArgsConstructor(access = AccessLevel.PRIVATE); fix EMPTY constant to
pass Option.empty() instead of null
- HoodieTableConfig: use parameterized logging instead of String.format
- HoodieTableVersion: document why versionCode uses @Accessors(fluent =
true)
---
.../hudi/common/table/HoodieTableConfig.java | 24 ++--
.../hudi/common/table/HoodieTableMetaClient.java | 159 +++++++--------------
.../hudi/common/table/HoodieTableVersion.java | 24 ++--
.../hudi/common/table/TableSchemaResolver.java | 12 +-
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 6 +-
.../hudi/common/table/cdc/HoodieCDCFileSplit.java | 24 +---
.../hudi/common/table/cdc/HoodieCDCOperation.java | 14 +-
.../hudi/common/table/checkpoint/Checkpoint.java | 21 ++-
.../table/log/AbstractHoodieLogRecordScanner.java | 92 ++++--------
.../table/log/BaseHoodieLogRecordReader.java | 103 +++++--------
.../apache/hudi/common/table/log/FullKeySpec.java | 15 +-
.../hudi/common/table/log/HoodieLogFileReader.java | 26 ++--
.../hudi/common/table/log/HoodieLogFormat.java | 12 +-
.../common/table/log/HoodieLogFormatReader.java | 8 +-
.../table/log/HoodieMergedLogRecordReader.java | 20 +--
.../table/log/HoodieMergedLogRecordScanner.java | 20 +--
.../apache/hudi/common/table/log/InstantRange.java | 17 +--
.../common/table/log/block/HoodieCommandBlock.java | 7 +-
.../common/table/log/block/HoodieDataBlock.java | 11 +-
.../common/table/log/block/HoodieDeleteBlock.java | 6 +-
.../table/log/block/HoodieHFileDataBlock.java | 2 +
.../common/table/log/block/HoodieLogBlock.java | 94 +++---------
.../hudi/common/table/read/BufferedRecord.java | 47 ++----
.../table/read/BufferedRecordMergerFactory.java | 7 +-
.../hudi/common/table/read/DeleteContext.java | 22 +--
.../table/read/IncrementalQueryAnalyzer.java | 70 +++------
.../buffer/DefaultFileGroupRecordBufferLoader.java | 8 +-
.../table/read/buffer/FileGroupRecordBuffer.java | 19 +--
.../buffer/PositionBasedFileGroupRecordBuffer.java | 15 +-
.../source/split/HoodieSourceSplitSerializer.java | 12 +-
.../hudi/source/TestIncrementalInputSplits.java | 4 +-
.../source/TestStreamReadMonitoringFunction.java | 4 +-
.../hudi/source/split/TestHoodieSourceSplit.java | 14 +-
.../split/TestHoodieSourceSplitSerializer.java | 76 +++++-----
34 files changed, 359 insertions(+), 656 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index ca775136257c..73703d70d85a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -65,8 +65,7 @@ import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.concurrent.Immutable;
@@ -119,6 +118,7 @@ import static
org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@Immutable
+@Slf4j
@ConfigClassProperty(name = "Hudi Table Basic Configs",
groupName = ConfigGroups.Names.TABLE_CONFIG,
description = "Configurations of the Hudi Table like type of ingestion,
storage formats, hive table name etc."
@@ -126,8 +126,6 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkArgument;
+ " initializing a path as hoodie base path and never changes during
the lifetime of a hoodie table.")
public class HoodieTableConfig extends HoodieConfig {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieTableConfig.class);
-
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP =
"hoodie.properties.backup";
public static final String HOODIE_WRITE_TABLE_NAME_KEY =
"hoodie.datasource.write.table.name";
@@ -475,7 +473,7 @@ public class HoodieTableConfig extends HoodieConfig {
public HoodieTableConfig(HoodieStorage storage, StoragePath metaPath) {
super();
StoragePath propertyPath = new StoragePath(metaPath,
HOODIE_PROPERTIES_FILE);
- LOG.info("Loading table properties from " + propertyPath);
+ log.info("Loading table properties from {}", propertyPath);
try {
this.props = fetchConfigs(storage, metaPath, HOODIE_PROPERTIES_FILE,
HOODIE_PROPERTIES_FILE_BACKUP, MAX_READ_RETRIES, READ_RETRY_DELAY_MSEC);
} catch (IOException e) {
@@ -509,7 +507,7 @@ public class HoodieTableConfig extends HoodieConfig {
checksum = propsWithChecksum.getProperty(TABLE_CHECKSUM.key());
props.setProperty(TABLE_CHECKSUM.key(), checksum);
}
- LOG.info("Created properties file at " + propertyPath);
+ log.info("Created properties file at {}", propertyPath);
return checksum;
}
@@ -556,7 +554,7 @@ public class HoodieTableConfig extends HoodieConfig {
propsToDelete.forEach(propToDelete -> props.remove(propToDelete));
checksum = storeProperties(props, out, cfgPath);
}
- LOG.warn(String.format("%s modified to: %s (at %s)", cfgPath.getName(),
props, cfgPath.getParent()));
+ log.warn("{} modified to: {} (at {})", cfgPath.getName(), props,
cfgPath.getParent());
// 5. verify and remove backup.
try (InputStream in = storage.open(cfgPath)) {
@@ -582,7 +580,7 @@ public class HoodieTableConfig extends HoodieConfig {
private static void deleteFile(HoodieStorage storage, StoragePath cfgPath)
throws IOException {
storage.deleteFile(cfgPath);
- LOG.info("Deleted properties file at " + cfgPath);
+ log.info("Deleted properties file at {}", cfgPath);
}
/**
@@ -685,7 +683,7 @@ public class HoodieTableConfig extends HoodieConfig {
boolean valid = tableVersion.greaterThan(firstVersion) ||
tableVersion.equals(firstVersion);
valid = valid ||
CONFIGS_REQUIRED_FOR_OLDER_VERSIONED_TABLES.contains(configProperty.key());
if (!valid) {
- LOG.warn("Table version {} is lower than or equal to config's first
version {}. Config {} will be ignored.",
+ log.warn("Table version {} is lower than or equal to config's first
version {}. Config {} will be ignored.",
tableVersion, firstVersion, configProperty.key());
}
return valid;
@@ -1009,12 +1007,12 @@ public class HoodieTableConfig extends HoodieConfig {
// Check ordering field name based on record merge mode
if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
if (nonEmpty(orderingFieldNamesAsString)) {
- LOG.warn("The ordering field ({}) is specified. COMMIT_TIME_ORDERING "
+ log.warn("The ordering field ({}) is specified. COMMIT_TIME_ORDERING "
+ "merge mode does not use ordering field anymore.",
orderingFieldNamesAsString);
}
} else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
if (isNullOrEmpty(orderingFieldNamesAsString)) {
- LOG.warn("The ordering field is not specified. EVENT_TIME_ORDERING "
+ log.warn("The ordering field is not specified. EVENT_TIME_ORDERING "
+ "merge mode requires ordering field to be set for getting the "
+ "event time. Using commit time-based ordering now.");
}
@@ -1324,7 +1322,7 @@ public class HoodieTableConfig extends HoodieConfig {
setValue(TABLE_METADATA_PARTITIONS,
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
setValue(TABLE_METADATA_PARTITIONS_INFLIGHT,
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
- LOG.info("MDT {} partition {} has been {}", metaClient.getBasePath(),
partitionPath, enabled ? "enabled" : "disabled");
+ log.info("MDT {} partition {} has been {}", metaClient.getBasePath(),
partitionPath, enabled ? "enabled" : "disabled");
}
/**
@@ -1342,7 +1340,7 @@ public class HoodieTableConfig extends HoodieConfig {
setValue(TABLE_METADATA_PARTITIONS_INFLIGHT,
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
- LOG.info("MDT {} partitions {} have been set to inflight",
metaClient.getBasePath(), partitionPaths);
+ log.info("MDT {} partitions {} have been set to inflight",
metaClient.getBasePath(), partitionPaths);
}
public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient,
MetadataPartitionType... partitionTypes) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index d724dda09f06..255c1d401f51 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -74,8 +74,11 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.Serializable;
@@ -118,10 +121,12 @@ import static
org.apache.hudi.metadata.HoodieIndexVersion.isValidIndexDefinition
* @see HoodieTimeline
* @since 0.3.0
*/
+@NoArgsConstructor
+@Getter
+@Slf4j
public class HoodieTableMetaClient implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieTableMetaClient.class);
public static final String METADATA_STR = "metadata";
public static final String METAFOLDER_NAME = ".hoodie";
public static final String TIMELINEFOLDER_NAME = "timeline";
@@ -163,20 +168,27 @@ public class HoodieTableMetaClient implements
Serializable {
protected StoragePath basePath;
protected StoragePath metaPath;
+ @Getter(AccessLevel.NONE)
+ @Setter
private transient HoodieStorage storage;
+ @Getter(AccessLevel.NONE)
private boolean loadActiveTimelineOnLoad;
protected StorageConfiguration<?> storageConf;
private HoodieTableType tableType;
private TimelineLayoutVersion timelineLayoutVersion;
private TimelineLayout timelineLayout;
private StoragePath timelinePath;
+ @Getter(AccessLevel.NONE)
private StoragePath timelineHistoryPath;
protected HoodieTableConfig tableConfig;
+ @Getter(AccessLevel.NONE)
protected HoodieActiveTimeline activeTimeline;
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig =
FileSystemRetryConfig.newBuilder().build();
+ @Getter(AccessLevel.NONE)
protected HoodieMetaserverConfig metaserverConfig;
private HoodieTimeGeneratorConfig timeGeneratorConfig;
+ @Getter(AccessLevel.NONE)
private Option<HoodieIndexMetadata> indexMetadataOpt;
private HoodieTableFormat tableFormat;
@@ -187,7 +199,7 @@ public class HoodieTableMetaClient implements Serializable {
protected HoodieTableMetaClient(HoodieStorage storage, String basePath,
boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig
consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
HoodieTimeGeneratorConfig
timeGeneratorConfig, FileSystemRetryConfig fileSystemRetryConfig) {
- LOG.debug("Loading HoodieTableMetaClient from " + basePath);
+ log.debug("Loading HoodieTableMetaClient from {}", basePath);
this.timeGeneratorConfig = timeGeneratorConfig;
this.consistencyGuardConfig = consistencyGuardConfig;
this.fileSystemRetryConfig = fileSystemRetryConfig;
@@ -213,21 +225,13 @@ public class HoodieTableMetaClient implements
Serializable {
this.timelinePath =
timelineLayout.getTimelinePathProvider().getTimelinePath(tableConfig,
this.basePath);
this.timelineHistoryPath =
timelineLayout.getTimelinePathProvider().getTimelineHistoryPath(tableConfig,
this.basePath);
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
- LOG.debug("Finished Loading Table of type " + tableType + "(version=" +
timelineLayoutVersion + ") from " + basePath);
+ log.debug("Finished Loading Table of type {}(version={}) from {}",
tableType, timelineLayoutVersion, basePath);
if (loadActiveTimelineOnLoad) {
- LOG.info("Loading Active commit timeline for " + basePath);
+ log.info("Loading Active commit timeline for {}", basePath);
getActiveTimeline();
}
}
- /**
- * For serializing and de-serializing.
- *
- * @deprecated
- */
- public HoodieTableMetaClient() {
- }
-
public String getIndexDefinitionPath() {
return tableConfig.getRelativeIndexDefinitionPath()
.map(definitionPath -> new StoragePath(basePath,
definitionPath).toString())
@@ -250,7 +254,7 @@ public class HoodieTableMetaClient implements Serializable {
Option<HoodieIndexDefinition> existingIndexOpt =
indexMetadataOpt.get().getIndex(indexName);
if (existingIndexOpt.isPresent()) {
if
(!existingIndexOpt.get().getSourceFields().equals(indexDefinition.getSourceFields()))
{
- LOG.info("List of columns to index is changing. Old value {}. New
value {}", existingIndexOpt.get().getSourceFields(),
+ log.info("List of columns to index is changing. Old value {}. New
value {}", existingIndexOpt.get().getSourceFields(),
indexDefinition.getSourceFields());
indexMetadataOpt.get().getIndexDefinitions().put(indexName,
indexDefinition);
} else {
@@ -386,35 +390,6 @@ public class HoodieTableMetaClient implements Serializable
{
out.defaultWriteObject();
}
- /**
- * Returns base path of the table
- */
- public StoragePath getBasePath() {
- return basePath; // this invocation is cached
- }
-
- /**
- * @return Hoodie Table Type
- */
- public HoodieTableType getTableType() {
- return tableType;
- }
-
- /**
- * @return Meta path
- */
- public StoragePath getMetaPath() {
- return metaPath;
- }
-
- public StoragePath getTimelinePath() {
- return timelinePath;
- }
-
- public HoodieTableFormat getTableFormat() {
- return tableFormat;
- }
-
/**
* @return schema folder path
*/
@@ -492,21 +467,6 @@ public class HoodieTableMetaClient implements Serializable
{
return timelineHistoryPath;
}
- /**
- * @return Table Config
- */
- public HoodieTableConfig getTableConfig() {
- return tableConfig;
- }
-
- public TimelineLayoutVersion getTimelineLayoutVersion() {
- return timelineLayoutVersion;
- }
-
- public TimelineLayout getTimelineLayout() {
- return timelineLayout;
- }
-
public boolean isMetadataTable() {
return HoodieTableMetadata.isMetadataTable(getBasePath());
}
@@ -540,18 +500,10 @@ public class HoodieTableMetaClient implements
Serializable {
consistencyGuard);
}
- public void setStorage(HoodieStorage storage) {
- this.storage = storage;
- }
-
public HoodieStorage getRawStorage() {
return getStorage().getRawStorage();
}
- public StorageConfiguration<?> getStorageConf() {
- return storageConf;
- }
-
/**
* Get the active instants as a timeline.
*
@@ -621,18 +573,6 @@ public class HoodieTableMetaClient implements Serializable
{
return TimelineUtils.generateInstantTime(shouldLock, timeGenerator);
}
- public HoodieTimeGeneratorConfig getTimeGeneratorConfig() {
- return timeGeneratorConfig;
- }
-
- public ConsistencyGuardConfig getConsistencyGuardConfig() {
- return consistencyGuardConfig;
- }
-
- public FileSystemRetryConfig getFileSystemRetryConfig() {
- return fileSystemRetryConfig;
- }
-
/**
* Get the archived commits as a timeline. This is costly operation, as all
data from the archived files are read.
* This should not be used, unless for historical debugging purposes.
@@ -696,7 +636,7 @@ public class HoodieTableMetaClient implements Serializable {
Properties props,
Integer timelineLayout,
boolean
shouldCreateTableConfig) throws IOException {
- LOG.info("Initializing {} as hoodie table", basePath);
+ log.info("Initializing {} as hoodie table", basePath);
final HoodieStorage storage = HoodieStorageUtils.getStorage(basePath,
storageConf);
if (!storage.exists(basePath)) {
storage.createDirectory(basePath);
@@ -871,31 +811,6 @@ public class HoodieTableMetaClient implements Serializable
{
return instantStream.sorted().collect(Collectors.toList());
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HoodieTableMetaClient that = (HoodieTableMetaClient) o;
- return Objects.equals(basePath, that.basePath) && tableType ==
that.tableType;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(basePath, tableType);
- }
-
- @Override
- public String toString() {
- return "HoodieTableMetaClient{" + "basePath='" + basePath + '\''
- + ", metaPath='" + metaPath + '\''
- + ", tableType=" + tableType
- + '}';
- }
-
public void initializeBootstrapDirsIfNotExists() throws IOException {
initializeBootstrapDirsIfNotExists(basePath, getStorage());
}
@@ -1037,6 +952,34 @@ public class HoodieTableMetaClient implements
Serializable {
return getTimelineLayout().getCommitMetadataSerDe();
}
+ // Not using Lombok @EqualsAndHashCode/@ToString here: this class is
subclassed (e.g. HoodieTableMetaserverClient),
+ // and we rely on the runtime subtype - exact-class matching in equals() and
the declaring-class behavior below.
+ // Lombok would switch equals() to instanceof, making a base instance
compare equal to a subtype instance.
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieTableMetaClient that = (HoodieTableMetaClient) o;
+ return Objects.equals(basePath, that.basePath) && tableType ==
that.tableType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(basePath, tableType);
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieTableMetaClient{" + "basePath='" + basePath + '\''
+ + ", metaPath='" + metaPath + '\''
+ + ", tableType=" + tableType
+ + '}';
+ }
+
public static TableBuilder newTableBuilder() {
return new TableBuilder();
}
@@ -1044,6 +987,7 @@ public class HoodieTableMetaClient implements Serializable
{
/**
* Builder for {@link Properties}.
*/
+ @NoArgsConstructor(access = AccessLevel.PACKAGE)
public static class TableBuilder {
private HoodieTableType tableType;
@@ -1093,9 +1037,6 @@ public class HoodieTableMetaClient implements
Serializable {
*/
private final Properties others = new Properties();
- TableBuilder() {
- }
-
public TableBuilder setTableType(HoodieTableType tableType) {
this.tableType = tableType;
return this;
@@ -1667,7 +1608,7 @@ public class HoodieTableMetaClient implements
Serializable {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(basePath)
.setMetaserverConfig(props)
.build();
- LOG.info("Finished initializing Table of type {} from {}",
metaClient.getTableConfig().getTableType(), basePath);
+ log.info("Finished initializing Table of type {} from {}",
metaClient.getTableConfig().getTableType(), basePath);
return metaClient;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
index 945ed2366607..7ec8445809c9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
@@ -22,6 +22,11 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
import java.util.Arrays;
import java.util.List;
@@ -29,7 +34,10 @@ import java.util.List;
* Table's version that controls what version of writer/readers can actually
read/write
* to a given table.
*/
+@AllArgsConstructor
+@Getter
public enum HoodieTableVersion {
+
// < 0.6.0 versions
ZERO(0, CollectionUtils.createImmutableList("0.3.0"),
TimelineLayoutVersion.LAYOUT_VERSION_0),
// 0.6.0 onwards
@@ -51,26 +59,14 @@ public enum HoodieTableVersion {
// 1.1
NINE(9, CollectionUtils.createImmutableList("1.1.0"),
TimelineLayoutVersion.LAYOUT_VERSION_2);
+ @Accessors(fluent = true) // Required so that #versionCode() is generated
instead of #getVersionCode() by Lombok
private final int versionCode;
+ @Getter(AccessLevel.NONE)
private final List<String> releaseVersions;
private final TimelineLayoutVersion timelineLayoutVersion;
- HoodieTableVersion(int versionCode, List<String> releaseVersions,
TimelineLayoutVersion timelineLayoutVersion) {
- this.versionCode = versionCode;
- this.releaseVersions = releaseVersions;
- this.timelineLayoutVersion = timelineLayoutVersion;
- }
-
- public TimelineLayoutVersion getTimelineLayoutVersion() {
- return timelineLayoutVersion;
- }
-
- public int versionCode() {
- return versionCode;
- }
-
public static HoodieTableVersion current() {
return NINE;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index ee2cfbf0d362..157f9d4c79ca 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -49,8 +49,7 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.concurrent.ThreadSafe;
@@ -67,11 +66,10 @@ import java.util.stream.Stream;
/**
* Helper class to read schema from data files and log files and to convert it
between different formats.
*/
+@Slf4j
@ThreadSafe
public class TableSchemaResolver {
- private static final Logger LOG =
LoggerFactory.getLogger(TableSchemaResolver.class);
-
protected final HoodieTableMetaClient metaClient;
/**
@@ -254,11 +252,11 @@ public class TableSchemaResolver {
.map(writeStat -> new StoragePath(metaClient.getBasePath(),
writeStat.getPath()));
return Option.of(fetchSchemaFromFiles(filePaths));
} else {
- LOG.debug("Could not find any data file written for commit, so could
not get schema for table {}", metaClient.getBasePath());
+ log.debug("Could not find any data file written for commit, so could
not get schema for table {}", metaClient.getBasePath());
return Option.empty();
}
default:
- LOG.error("Unknown table type {}", metaClient.getTableType());
+ log.error("Unknown table type {}", metaClient.getTableType());
throw new InvalidTableException(metaClient.getBasePath().toString());
}
}
@@ -373,7 +371,7 @@ public class TableSchemaResolver {
HoodieSchema tableSchema = getTableSchemaFromDataFile();
return
tableSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent();
} catch (Exception e) {
- LOG.info("Failed to read operation field from schema ({})",
e.getMessage());
+ log.info("Failed to read operation field from schema ({})",
e.getMessage());
return false;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 612837c4dc4d..beb9b56bc66a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -219,9 +219,9 @@ public class HoodieCDCExtractor {
try {
Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, CLUSTERING_ACTION));
HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
- if (instantRange.getStartInstant().isPresent() &&
!metaClient.getArchivedTimeline().empty()
- &&
InstantComparison.compareTimestamps(metaClient.getArchivedTimeline().lastInstant().get().requestedTime(),
InstantComparison.GREATER_THAN, instantRange.getStartInstant().get())) {
- throw new HoodieException("Start instant time " +
instantRange.getStartInstant().get()
+ if (instantRange.getStartInstantOpt().isPresent() &&
!metaClient.getArchivedTimeline().empty()
+ &&
InstantComparison.compareTimestamps(metaClient.getArchivedTimeline().lastInstant().get().requestedTime(),
InstantComparison.GREATER_THAN, instantRange.getStartInstantOpt().get())) {
+ throw new HoodieException("Start instant time " +
instantRange.getStartInstantOpt().get()
+ " for CDC query has to be in the active timeline. Beginning of
active timeline " + activeTimeLine.firstInstant().get().requestedTime());
}
this.commits = activeTimeLine.getInstantsAsStream()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
index c33df8414bd2..24fb94356a0d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.util.Option;
+import lombok.Getter;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
@@ -45,7 +47,9 @@ import java.util.stream.Collectors;
* For `cdcInferCase` = {@link HoodieCDCInferenceCase#REPLACE_COMMIT},
`cdcFile` is null,
* `beforeFileSlice` is the current version of the file slice.
*/
+@Getter
public class HoodieCDCFileSplit implements Serializable,
Comparable<HoodieCDCFileSplit> {
+
/**
* The instant time at which the changes happened.
*/
@@ -103,26 +107,6 @@ public class HoodieCDCFileSplit implements Serializable,
Comparable<HoodieCDCFil
this.afterFileSlice = afterFileSlice;
}
- public String getInstant() {
- return this.instant;
- }
-
- public HoodieCDCInferenceCase getCdcInferCase() {
- return this.cdcInferCase;
- }
-
- public List<String> getCdcFiles() {
- return this.cdcFiles;
- }
-
- public Option<FileSlice> getBeforeFileSlice() {
- return this.beforeFileSlice;
- }
-
- public Option<FileSlice> getAfterFileSlice() {
- return this.afterFileSlice;
- }
-
@Override
public int compareTo(HoodieCDCFileSplit o) {
int cmpResult = this.instant.compareTo(o.instant);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
index 90540bc05a69..2cb0f19687e0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
@@ -20,9 +20,15 @@ package org.apache.hudi.common.table.cdc;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
/**
* Enumeration of change log operation.
*/
+@AllArgsConstructor(access = AccessLevel.PACKAGE)
+@Getter
public enum HoodieCDCOperation {
INSERT("i"),
UPDATE("u"),
@@ -30,14 +36,6 @@ public enum HoodieCDCOperation {
private final String value;
- HoodieCDCOperation(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return this.value;
- }
-
public static HoodieCDCOperation parse(String value) {
switch (value) {
case "i":
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
index 67248ba4adcd..eea1c32dd373 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
@@ -19,6 +19,9 @@
package org.apache.hudi.common.table.checkpoint;
+import lombok.AccessLevel;
+import lombok.Getter;
+
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@@ -27,13 +30,16 @@ import java.util.Objects;
/**
* Class for representing checkpoint
*/
+@Getter
public abstract class Checkpoint implements Serializable {
+
public static final String CHECKPOINT_IGNORE_KEY =
"deltastreamer.checkpoint.ignore_key";
protected String checkpointKey;
protected String checkpointResetKey;
protected String checkpointIgnoreKey;
// These are extra props to be written to the commit metadata
+ @Getter(AccessLevel.NONE)
protected Map<String, String> extraProps = new HashMap<>();
public Checkpoint setCheckpointKey(String newKey) {
@@ -41,21 +47,12 @@ public abstract class Checkpoint implements Serializable {
return this;
}
- public String getCheckpointKey() {
- return checkpointKey;
- }
-
- public String getCheckpointResetKey() {
- return checkpointResetKey;
- }
-
- public String getCheckpointIgnoreKey() {
- return checkpointIgnoreKey;
- }
-
public abstract Map<String, String> getCheckpointCommitMetadata(String
overrideResetKey,
String
overrideIgnoreKey);
+ // Not using Lombok @EqualsAndHashCode/@ToString here: this class is
subclassed, and we rely on
+ // the runtime subtype - exact-class matching in equals() and
getClass().getSimpleName() in toString().
+ // Lombok would bake in the declaring class (Checkpoint) and switch equals()
to instanceof.
@Override
public int hashCode() {
return Objects.hashCode(checkpointKey);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index b1eb9e0e2c98..49c17eda75c8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -48,8 +48,10 @@ import
org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -87,10 +89,9 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* <p>
* This results in two I/O passes over the log file.
*/
+@Slf4j
public abstract class AbstractHoodieLogRecordScanner {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractHoodieLogRecordScanner.class);
-
// Reader schema for the records
protected final HoodieSchema readerSchema;
// Latest valid instant time
@@ -98,14 +99,17 @@ public abstract class AbstractHoodieLogRecordScanner {
private final String latestInstantTime;
protected final HoodieTableMetaClient hoodieTableMetaClient;
// Merge strategy to use when combining records from log
+ @Getter(AccessLevel.PROTECTED)
private final String payloadClassFQN;
// Record's key/partition-path fields
private final String recordKeyField;
private final Option<String> partitionPathFieldOpt;
// Partition name override
+ @Getter
private final Option<String> partitionNameOverrideOpt;
// Stateless component for merging records
protected final HoodieRecordMerger recordMerger;
+ @Getter(AccessLevel.PROTECTED)
private final TypedProperties payloadProps;
// Log File Paths
protected final List<String> logFilePaths;
@@ -117,6 +121,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// optional instant range for incremental block filtering
private final Option<InstantRange> instantRange;
// Read the operation metadata field from the avro record
+ @Getter
private final boolean withOperationField;
private final HoodieStorage storage;
// Total log files read - for metrics
@@ -132,16 +137,19 @@ public abstract class AbstractHoodieLogRecordScanner {
// Total number of corrupt blocks written across all log files
private AtomicLong totalCorruptBlocks = new AtomicLong(0);
// Store the last instant log blocks (needed to implement rollback)
+ @Getter
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
// Enables full scan of log records
protected final boolean forceFullScan;
// Progress
+ @Getter
private float progress = 0.0f;
// Populate meta fields for the records
private final boolean populateMetaFields;
// Record type read from log block
protected final HoodieRecordType recordType;
// Collect all the block instants after scanning all the log files.
+ @Getter
private final List<String> validBlockInstants = new ArrayList<>();
// table version for compatibility
private final HoodieTableVersion tableVersion;
@@ -301,7 +309,7 @@ public abstract class AbstractHoodieLogRecordScanner {
*/
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
- LOG.info("Scanning log file {}", logFile);
+ log.info("Scanning log file {}", logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the
log file
@@ -310,7 +318,7 @@ public abstract class AbstractHoodieLogRecordScanner {
totalLogBlocks.incrementAndGet();
// Ignore the corrupt blocks. No further handling is required for them.
if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
- LOG.info("Found a corrupt block in {}", logFile.getPath());
+ log.info("Found a corrupt block in {}", logFile.getPath());
totalCorruptBlocks.incrementAndGet();
continue;
}
@@ -347,7 +355,7 @@ public abstract class AbstractHoodieLogRecordScanner {
instantToBlocksMap.put(instantTime, logBlocksList);
break;
case COMMAND_BLOCK:
- LOG.info("Reading a command block from file {}",
logFile.getPath());
+ log.info("Reading a command block from file {}",
logFile.getPath());
// This is a command block - take appropriate action based on the
command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
@@ -367,8 +375,8 @@ public abstract class AbstractHoodieLogRecordScanner {
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ordered instant times seen {}", orderedInstantsList);
+ if (log.isDebugEnabled()) {
+ log.debug("Ordered instant times seen {}", orderedInstantsList);
}
int numBlocksRolledBack = 0;
@@ -424,24 +432,24 @@ public abstract class AbstractHoodieLogRecordScanner {
validBlockInstants.add(compactedFinalInstantTime);
}
}
- LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
+ log.info("Number of applied rollback blocks {}", numBlocksRolledBack);
- if (LOG.isDebugEnabled()) {
- LOG.info("Final view of the Block time to compactionBlockMap {}",
blockTimeToCompactionBlockTimeMap);
+ if (log.isDebugEnabled()) {
+ log.info("Final view of the Block time to compactionBlockMap {}",
blockTimeToCompactionBlockTimeMap);
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
- LOG.info("Merging the final data blocks");
+ log.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
// Done
progress = 1.0f;
} catch (IOException e) {
- LOG.error("Got IOException when reading log file", e);
+ log.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
+ log.error("Got exception when reading log file", e);
throw new HoodieException("Exception when reading log file ", e);
} finally {
try {
@@ -450,7 +458,7 @@ public abstract class AbstractHoodieLogRecordScanner {
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that
can happen
- LOG.error("Unable to close log format reader", ioe);
+ log.error("Unable to close log format reader", ioe);
}
}
}
@@ -506,7 +514,7 @@ public abstract class AbstractHoodieLogRecordScanner {
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks,
int numLogFilesSeen,
Option<KeySpec> keySpecOpt)
throws Exception {
while (!logBlocks.isEmpty()) {
- LOG.info("Number of remaining logblocks to merge {}", logBlocks.size());
+ log.info("Number of remaining logblocks to merge {}", logBlocks.size());
// poll the element at the bottom of the stack since that's the order it
was inserted
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
@@ -519,7 +527,7 @@ public abstract class AbstractHoodieLogRecordScanner {
Arrays.stream(((HoodieDeleteBlock)
lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
break;
case CORRUPT_BLOCK:
- LOG.warn("Found a corrupt block which was not rolled back");
+ log.warn("Found a corrupt block which was not rolled back");
break;
default:
break;
@@ -535,13 +543,6 @@ public abstract class AbstractHoodieLogRecordScanner {
return !forceFullScan;
}
- /**
- * Return progress of scanning as a float between 0.0 to 1.0.
- */
- public float getProgress() {
- return progress;
- }
-
public long getTotalLogFiles() {
return totalLogFiles.get();
}
@@ -554,14 +555,6 @@ public abstract class AbstractHoodieLogRecordScanner {
return totalLogBlocks.get();
}
- protected String getPayloadClassFQN() {
- return payloadClassFQN;
- }
-
- public Option<String> getPartitionNameOverride() {
- return partitionNameOverrideOpt;
- }
-
public long getTotalRollbacks() {
return totalRollbacks.get();
}
@@ -570,14 +563,6 @@ public abstract class AbstractHoodieLogRecordScanner {
return totalCorruptBlocks.get();
}
- public boolean isWithOperationField() {
- return withOperationField;
- }
-
- protected TypedProperties getPayloadProps() {
- return payloadProps;
- }
-
/**
* Key specification with a list of column names.
*/
@@ -595,16 +580,11 @@ public abstract class AbstractHoodieLogRecordScanner {
}
}
+ @AllArgsConstructor
+ @Getter
private static class FullKeySpec implements KeySpec {
- private final List<String> keys;
- private FullKeySpec(List<String> keys) {
- this.keys = keys;
- }
- @Override
- public List<String> getKeys() {
- return keys;
- }
+ private final List<String> keys;
@Override
public boolean isFullKey() {
@@ -612,12 +592,10 @@ public abstract class AbstractHoodieLogRecordScanner {
}
}
+ @AllArgsConstructor
private static class PrefixKeySpec implements KeySpec {
- private final List<String> keysPrefixes;
- private PrefixKeySpec(List<String> keysPrefixes) {
- this.keysPrefixes = keysPrefixes;
- }
+ private final List<String> keysPrefixes;
@Override
public List<String> getKeys() {
@@ -630,14 +608,6 @@ public abstract class AbstractHoodieLogRecordScanner {
}
}
- public Deque<HoodieLogBlock> getCurrentInstantLogBlocks() {
- return currentInstantLogBlocks;
- }
-
- public List<String> getValidBlockInstants() {
- return validBlockInstants;
- }
-
private Pair<ClosableIterator<HoodieRecord>, HoodieSchema>
getRecordsIterator(
HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws
IOException {
ClosableIterator<HoodieRecord> blockRecordsIterator;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 332a9c920c6e..159662f73bb9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -43,8 +43,9 @@ import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -74,9 +75,9 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
*
* @param <T> type of engine-specific record representation.
*/
+@Slf4j
public abstract class BaseHoodieLogRecordReader<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseHoodieLogRecordReader.class);
public static final String LOG_BLOCK_FULL_READ_DURATION_IN_MILLIS =
"logBlockFullReadDurationInMillis";
public static final String BLOCK_SIZE_IN_BYTES = "blockSizeInBytes";
public static final String TOTAL_RECORDS_PRESENT_IN_LOG_BLOCK =
"totalRecordsPresentInLogBlock";
@@ -89,13 +90,16 @@ public abstract class BaseHoodieLogRecordReader<T> {
protected final HoodieReaderContext<T> readerContext;
protected final HoodieTableMetaClient hoodieTableMetaClient;
// Merge strategy to use when combining records from log
+ @Getter(AccessLevel.PROTECTED)
private final String payloadClassFQN;
// Record's key/partition-path fields
private final String recordKeyField;
// Partition name override
+ @Getter
private final Option<String> partitionNameOverrideOpt;
// Ordering fields
protected final String orderingFields;
+ @Getter(AccessLevel.PROTECTED)
private final TypedProperties payloadProps;
// Log File Paths
protected final List<HoodieLogFile> logFiles;
@@ -107,6 +111,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
// optional instant range for incremental block filtering
private final Option<InstantRange> instantRange;
// Read the operation metadata field from the avro record
+ @Getter
private final boolean withOperationField;
// FileSystem
private final HoodieStorage storage;
@@ -129,15 +134,18 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Scan duration in milliseconds
private AtomicLong blocksScanDuration = new AtomicLong(0);
// Store the last instant log blocks (needed to implement rollback)
+ @Getter
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
// Enables full scan of log records
protected final boolean forceFullScan;
// Progress
+ @Getter
private float progress = 0.0f;
- // Record type read from log block
// Collect all the block instants after scanning all the log files.
+ @Getter
private final List<String> validBlockInstants = new ArrayList<>();
// Block-level scan stats for processed data blocks.
+ @Getter
private List<Map<String, Object>> blocksStats = new ArrayList<>();
protected HoodieFileGroupRecordBuffer<T> recordBuffer;
// Allows to consider inflight instants while merging log records
@@ -271,7 +279,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
*/
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
- LOG.debug("Scanning log file {}", logFile);
+ log.debug("Scanning log file {}", logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the
log file
@@ -279,11 +287,11 @@ public abstract class BaseHoodieLogRecordReader<T> {
logBlock.getBlockContentLocation()
.map(contentLocation ->
totalLogBlocksSize.addAndGet(contentLocation.getBlockSize()));
final String instantTime =
logBlock.getLogBlockHeader().get(INSTANT_TIME);
- LOG.debug("Scanning log block with instant time {}", instantTime);
+ log.debug("Scanning log block with instant time {}", instantTime);
totalLogBlocks.incrementAndGet();
// Ignore the corrupt blocks. No further handling is required for them.
if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
- LOG.debug("Found a corrupt block in {}", logFile.getPath());
+ log.debug("Found a corrupt block in {}", logFile.getPath());
totalCorruptBlocks.incrementAndGet();
continue;
}
@@ -314,7 +322,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
case DELETE_BLOCK:
- LOG.debug("Reading a {} block with instant time {}",
+ log.debug("Reading a {} block with instant time {}",
logBlock.getBlockType() ==
HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK ? "delete" : "data",
instantTime);
List<HoodieLogBlock> logBlocksList =
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
@@ -326,7 +334,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
instantToBlocksMap.put(instantTime, logBlocksList);
break;
case COMMAND_BLOCK:
- LOG.debug("Reading a command block from file {}",
logFile.getPath());
+ log.debug("Reading a command block from file {}",
logFile.getPath());
// This is a command block - take appropriate action based on the
command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
@@ -341,10 +349,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
if (rolledBackBlocks != null) {
numBlocksRolledBack += rolledBackBlocks.size();
}
- LOG.debug("Reading a rollback block with instant {} and target
instant {}",
+ log.debug("Reading a rollback block with instant {} and target
instant {}",
instantTime, targetInstantForCommandBlock);
} else {
- LOG.error("Reading a command block with instant {} whose
operation is not supported", instantTime);
+ log.error("Reading a command block with instant {} whose
operation is not supported", instantTime);
throw new UnsupportedOperationException("Command type not yet
supported.");
}
break;
@@ -353,8 +361,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
}
- LOG.info("Ordered instant times seen {}", orderedInstantsList);
- LOG.info("Targeted instants that are rolled back are {}",
targetRollbackInstants);
+ log.info("Ordered instant times seen {}", orderedInstantsList);
+ log.info("Targeted instants that are rolled back are {}",
targetRollbackInstants);
// All the block's instants time that are added to the queue are
collected in this set.
Set<String> instantTimesIncluded = new HashSet<>();
@@ -377,7 +385,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
// For compacted blocks COMPACTED_BLOCK_TIMES entry is present under
its headers.
if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES))
{
- LOG.debug("For instant time {}, compacted block instants are {}",
+ log.debug("For instant time {}, compacted block instants are {}",
instantTime,
firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES));
// When compacted blocks are seen update the
blockTimeToCompactionBlockTimeMap.
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
@@ -410,20 +418,20 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
}
Collections.reverse(validBlockInstants);
- LOG.debug("Number of applied rollback blocks {}", numBlocksRolledBack);
- LOG.info("Total valid instants found are {}. Instants are {}",
validBlockInstants.size(), validBlockInstants);
+ log.debug("Number of applied rollback blocks {}", numBlocksRolledBack);
+ log.info("Total valid instants found are {}. Instants are {}",
validBlockInstants.size(), validBlockInstants);
if (ignoredBlockCount > 0) {
- LOG.info("Ignored {} log blocks from {} instants not in the range:
{}", ignoredBlockCount, ignoredInstants.size(), ignoredInstants);
+ log.info("Ignored {} log blocks from {} instants not in the range:
{}", ignoredBlockCount, ignoredInstants.size(), ignoredInstants);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Final view of the Block time to compactionBlockMap {}",
blockTimeToCompactionBlockTimeMap);
+ if (log.isDebugEnabled()) {
+ log.debug("Final view of the Block time to compactionBlockMap {}",
blockTimeToCompactionBlockTimeMap);
}
totalValidLogBlocks.set(currentInstantLogBlocks.size());
blocksScanDuration.set(scanTimer.endTimer());
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
- LOG.debug("Merging the final data blocks");
+ log.debug("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
// Done
@@ -432,10 +440,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
totalLogRecords.set(recordBuffer.getTotalLogRecords());
}
} catch (IOException e) {
- LOG.error("Got IOException when reading log file", e);
+ log.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
+ log.error("Got exception when reading log file", e);
throw new HoodieException("Exception when reading log file ", e);
} finally {
try {
@@ -444,18 +452,18 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that
can happen
- LOG.error("Unable to close log format reader", ioe);
+ log.error("Unable to close log format reader", ioe);
}
if (!logFiles.isEmpty()) {
try {
StoragePath path = logFiles.get(0).getPath();
- LOG.info("Finished scanning log files. FileId: {},
LogFileInstantTime: {}, "
+ log.info("Finished scanning log files. FileId: {},
LogFileInstantTime: {}, "
+ "Total log files: {}, Total log blocks: {}, Total
rollbacks: {}, Total corrupt blocks: {}",
FSUtils.getFileIdFromLogPath(path),
FSUtils.getDeltaCommitTimeFromLogPath(path),
totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(),
totalCorruptBlocks.get());
} catch (Exception e) {
- LOG.warn("Could not extract fileId from log path", e);
- LOG.info("Finished scanning log files. "
+ log.warn("Could not extract fileId from log path", e);
+ log.info("Finished scanning log files. "
+ "Total log files: {}, Total log blocks: {}, Total
rollbacks: {}, Total corrupt blocks: {}",
totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(),
totalCorruptBlocks.get());
}
@@ -469,7 +477,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks,
int numLogFilesSeen,
Option<KeySpec> keySpecOpt)
throws Exception {
while (!logBlocks.isEmpty()) {
- LOG.debug("Number of remaining logblocks to merge {}", logBlocks.size());
+ log.debug("Number of remaining logblocks to merge {}", logBlocks.size());
// poll the element at the bottom of the stack since that's the order it
was inserted
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
@@ -482,7 +490,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
recordBuffer.processDeleteBlock((HoodieDeleteBlock) lastBlock);
break;
case CORRUPT_BLOCK:
- LOG.warn("Found a corrupt block which was not rolled back");
+ log.warn("Found a corrupt block which was not rolled back");
break;
default:
break;
@@ -494,7 +502,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
String blockInstantTime = dataBlock.getLogBlockHeader().get(INSTANT_TIME);
- LOG.debug("Processing log block with instant time {}", blockInstantTime);
+ log.debug("Processing log block with instant time {}", blockInstantTime);
long totalLogRecordsBefore = recordBuffer != null ?
recordBuffer.getTotalLogRecords() : 0L;
HoodieTimer blockReadTimer = HoodieTimer.start();
recordBuffer.processDataBlock(dataBlock, keySpecOpt);
@@ -507,7 +515,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
.map(contentLocation -> blockReadMetrics.put(BLOCK_SIZE_IN_BYTES,
contentLocation.getBlockSize()));
blockReadMetrics.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME.toString(),
blockInstantTime);
blocksStats.add(blockReadMetrics);
- LOG.debug("For log block, scan metrics are {}", blockReadMetrics);
+ log.debug("For log block, scan metrics are {}", blockReadMetrics);
}
private boolean shouldLookupRecords() {
@@ -516,13 +524,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
return !forceFullScan;
}
- /**
- * Return progress of scanning as a float between 0.0 to 1.0.
- */
- public float getProgress() {
- return progress;
- }
-
public long getTotalLogFiles() {
return totalLogFiles.get();
}
@@ -543,22 +544,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
return totalValidLogBlocks.get();
}
- public List<Map<String, Object>> getBlocksStats() {
- return blocksStats;
- }
-
public long getBlocksScanDuration() {
return blocksScanDuration.get();
}
- protected String getPayloadClassFQN() {
- return payloadClassFQN;
- }
-
- public Option<String> getPartitionNameOverride() {
- return partitionNameOverrideOpt;
- }
-
public long getTotalRollbacks() {
return totalRollbacks.get();
}
@@ -567,22 +556,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
return totalCorruptBlocks.get();
}
- public boolean isWithOperationField() {
- return withOperationField;
- }
-
- protected TypedProperties getPayloadProps() {
- return payloadProps;
- }
-
- public Deque<HoodieLogBlock> getCurrentInstantLogBlocks() {
- return currentInstantLogBlocks;
- }
-
- public List<String> getValidBlockInstants() {
- return validBlockInstants;
- }
-
/**
* Builder used to build {@code AbstractHoodieLogRecordScanner}.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
index ede7918649b4..a186abd73e9a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
@@ -19,6 +19,9 @@
package org.apache.hudi.common.table.log;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
import java.util.List;
/**
@@ -26,17 +29,11 @@ import java.util.List;
* That is, the comparison between a record key and an element
* of the set is {@link String#equals}.
*/
+@AllArgsConstructor
+@Getter
public class FullKeySpec implements KeySpec {
- private final List<String> keys;
-
- public FullKeySpec(List<String> keys) {
- this.keys = keys;
- }
- @Override
- public List<String> getKeys() {
- return keys;
- }
+ private final List<String> keys;
@Override
public boolean isFullKey() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 2c1d91e7b7c1..03ae4a2c4c07 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -44,8 +44,8 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@@ -63,14 +63,15 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* Scans a log file and provides block level iterator on the log file Loads
the entire block contents in memory Can emit
* either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is
found).
*/
+@Slf4j
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
- private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieLogFileReader.class);
+ private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024;
private static final String REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED =
"Reverse log reader has not been enabled";
private final HoodieStorage storage;
+ @Getter
private final HoodieLogFile logFile;
private final int bufferSize;
private final byte[] magicBuffer = new byte[6];
@@ -118,11 +119,6 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
}
}
- @Override
- public HoodieLogFile getLogFile() {
- return logFile;
- }
-
// TODO : convert content and block length to long by using ByteBuffer, raw
byte [] allows
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
@@ -244,12 +240,12 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
}
private HoodieLogBlock createCorruptBlock(long blockStartPos) throws
IOException {
- LOG.info("Log {} has a corrupted block at {}", logFile, blockStartPos);
+ log.info("Log {} has a corrupted block at {}", logFile, blockStartPos);
inputStream.seek(blockStartPos);
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the
nextBlockOffset
inputStream.seek(blockStartPos);
- LOG.info("Next available block in {} starts at {}", logFile,
nextBlockOffset);
+ log.info("Next available block in {} starts at {}", logFile,
nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
long contentPosition = inputStream.getPos();
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream,
corruptedBlockSize, true);
@@ -276,7 +272,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
// So we have to shorten the footer block size by the size of magic hash
blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
} catch (EOFException e) {
- LOG.info("Found corrupted block in file {} with block size({}) running
past EOF", logFile, blocksize);
+ log.info("Found corrupted block in file {} with block size({}) running
past EOF", logFile, blocksize);
// this is corrupt
// This seek is required because contract of seek() is different for
naked DFSInputStream vs BufferedFSInputStream
// release-3.1.0-RC1/DFSInputStream.java#L1455
@@ -286,7 +282,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
}
if (blocksize != blockSizeFromFooter) {
- LOG.info("Found corrupted block in file {}. Header block size({}) did
not match the footer block size({})", logFile, blocksize, blockSizeFromFooter);
+ log.info("Found corrupted block in file {}. Header block size({}) did
not match the footer block size({})", logFile, blocksize, blockSizeFromFooter);
inputStream.seek(currentPos);
return true;
}
@@ -297,7 +293,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
return false;
} catch (CorruptedLogFileException e) {
// This is a corrupted block
- LOG.info("Found corrupted block in file {}. No magic hash found right
after footer block size entry", logFile);
+ log.info("Found corrupted block in file {}. No magic hash found right
after footer block size entry", logFile);
return true;
} finally {
inputStream.seek(currentPos);
@@ -330,7 +326,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
@Override
public void close() throws IOException {
if (!closed) {
- LOG.info("Closing Log file reader {}", logFile.getFileName());
+ log.info("Closing Log file reader {}", logFile.getFileName());
if (null != this.inputStream) {
this.inputStream.close();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index a675e0d9da89..3877a682240d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -246,18 +248,12 @@ public interface HoodieLogFormat {
* A set of feature flags associated with a log format. Versions are changed
when the log format changes. TODO(na) -
* Implement policies around major/minor versions
*/
+ @AllArgsConstructor(access = AccessLevel.PACKAGE)
+ @Getter
abstract class LogFormatVersion {
private final int version;
- LogFormatVersion(int version) {
- this.version = version;
- }
-
- public int getVersion() {
- return version;
- }
-
public abstract boolean hasMagicHeader();
public abstract boolean hasContent();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 77c3e78fcc32..1c342fecf39c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -25,8 +25,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
@@ -34,6 +33,7 @@ import java.util.List;
/**
* Hoodie log format reader.
*/
+@Slf4j
public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final List<HoodieLogFile> logFiles;
@@ -45,8 +45,6 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
private final boolean enableInlineReading;
private final int bufferSize;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieLogFormatReader.class);
-
HoodieLogFormatReader(HoodieStorage storage, List<HoodieLogFile> logFiles,
HoodieSchema readerSchema,
boolean reverseLogReader, int bufferSize, boolean
enableRecordLookups,
String recordKeyField, InternalSchema internalSchema)
throws IOException {
@@ -91,7 +89,7 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file
", io);
}
- LOG.debug("Moving to the next reader for logfile {}",
currentReader.getLogFile());
+ log.debug("Moving to the next reader for logfile {}",
currentReader.getLogFile());
return hasNext();
}
return false;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index b3f1eeaa988e..1ee8e7560d05 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -34,8 +34,8 @@ import org.apache.hudi.expression.Predicates;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.io.Serializable;
@@ -52,9 +52,11 @@ import static
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
*
* @param <T> type of engine-specific record representation.
*/
+@Getter
+@Slf4j
public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
implements Iterable<BufferedRecord<T>>, Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
+
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = HoodieTimer.create();
// count of merged records in log
@@ -102,8 +104,8 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
this.numMergedRecordsInLog = recordBuffer.size();
- LOG.info("Number of log files scanned => {}", logFiles.size());
- LOG.info("Number of entries in Map => {}", recordBuffer.size());
+ log.info("Number of log files scanned => {}", logFiles.size());
+ log.info("Number of entries in Map => {}", recordBuffer.size());
}
static Option<KeySpec> createKeySpec(Option<Predicate> filter) {
@@ -134,10 +136,6 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return recordBuffer.getLogRecords();
}
- public long getNumMergedRecordsInLog() {
- return numMergedRecordsInLog;
- }
-
/**
* Returns the builder for {@code HoodieMergedLogRecordReader}.
*/
@@ -145,10 +143,6 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return new Builder<>();
}
- public long getTotalTimeTakenToReadAndMergeBlocks() {
- return totalTimeTakenToReadAndMergeBlocks;
- }
-
@Override
public void close() {
// No op.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 318533859746..f7521fc207c5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,8 +47,8 @@ import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.concurrent.NotThreadSafe;
@@ -81,10 +81,10 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkArgument;
* This results in two I/O passes over the log file.
*/
@NotThreadSafe
+@Slf4j
public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
implements Iterable<HoodieRecord>, Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = HoodieTimer.create();
// Map of compacted/merged records
@@ -92,9 +92,11 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
// Set of already scanned prefixes allowing us to avoid scanning same
prefixes again
private final Set<String> scannedPrefixes;
// count of merged records in log
+ @Getter
private long numMergedRecordsInLog;
private final long maxMemorySizeInBytes;
// Stores the total time taken to perform reading and merging of log blocks
+ @Getter
private long totalTimeTakenToReadAndMergeBlocks;
private final String[] orderingFields;
private final DeleteContext deleteContext;
@@ -220,8 +222,8 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
this.numMergedRecordsInLog = records.size();
- if (LOG.isInfoEnabled()) {
- LOG.info("Scanned {} log files with stats: MaxMemoryInBytes => {},
MemoryBasedMap => {} entries, {} total bytes, DiskBasedMap => {} entries, {}
total bytes",
+ if (log.isInfoEnabled()) {
+ log.info("Scanned {} log files with stats: MaxMemoryInBytes => {},
MemoryBasedMap => {} entries, {} total bytes, DiskBasedMap => {} entries, {}
total bytes",
logFilePaths.size(), maxMemorySizeInBytes,
records.getInMemoryMapNumEntries(), records.getCurrentInMemoryMapSize(),
records.getDiskBasedMapNumEntries(),
records.getSizeOfFileOnDiskInBytes());
}
@@ -240,10 +242,6 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
return recordMerger.getRecordType();
}
- public long getNumMergedRecordsInLog() {
- return numMergedRecordsInLog;
- }
-
/**
* Returns the builder for {@code HoodieMergedLogRecordScanner}.
*/
@@ -313,10 +311,6 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
}
}
- public long getTotalTimeTakenToReadAndMergeBlocks() {
- return totalTimeTakenToReadAndMergeBlocks;
- }
-
@Override
public void close() {
if (records != null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 9dd56cc66182..eda285d478d8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -21,6 +21,10 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
@@ -37,6 +41,7 @@ import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
/**
* An instant range used for incremental reader filtering.
*/
+@Getter
public abstract class InstantRange implements Serializable {
private static final long serialVersionUID = 1L;
@@ -55,14 +60,6 @@ public abstract class InstantRange implements Serializable {
return new Builder();
}
- public Option<String> getStartInstant() {
- return startInstantOpt;
- }
-
- public Option<String> getEndInstant() {
- return endInstantOpt;
- }
-
public abstract boolean isInRange(String instant);
public abstract RangeType getRangeType();
@@ -248,6 +245,7 @@ public abstract class InstantRange implements Serializable {
/**
* Builder for {@link InstantRange}.
*/
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class Builder {
private String startInstant;
private String endInstant;
@@ -256,9 +254,6 @@ public abstract class InstantRange implements Serializable {
private Set<String> explicitInstants;
private List<InstantRange> instantRanges;
- private Builder() {
- }
-
public Builder startInstant(String startInstant) {
this.startInstant = startInstant;
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
index 965c57309b65..ea4c25db661a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
+import lombok.Getter;
+
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;
@@ -30,6 +32,7 @@ import java.util.function.Supplier;
/**
* Command block issues a specific command to the scanner.
*/
+@Getter
public class HoodieCommandBlock extends HoodieLogBlock {
private final HoodieCommandBlockTypeEnum type;
@@ -53,10 +56,6 @@ public class HoodieCommandBlock extends HoodieLogBlock {
HoodieCommandBlockTypeEnum.values()[Integer.parseInt(header.get(HeaderMetadataType.COMMAND_BLOCK_TYPE))];
}
- public HoodieCommandBlockTypeEnum getType() {
- return type;
- }
-
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.COMMAND_BLOCK;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 70ac694dedfe..dff48985f646 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -30,8 +30,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -57,8 +57,8 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* 2. Total number of records in the block
* 3. Actual serialized content of the records
*/
+@Slf4j
public abstract class HoodieDataBlock extends HoodieLogBlock {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieDataBlock.class);
// TODO rebase records/content to leverage Either to warrant
// that they are mutex (used by read/write flows respectively)
@@ -67,6 +67,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
/**
* Key field's name w/in the record's schema
*/
+ @Getter
private final String keyFieldName;
private final boolean enablePointLookups;
@@ -132,10 +133,6 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
return serializeRecords(records.get(), storage);
}
- public String getKeyFieldName() {
- return keyFieldName;
- }
-
public boolean containsPartialUpdates() {
return getLogBlockHeader().containsKey(HeaderMetadataType.IS_PARTIAL)
&&
Boolean.parseBoolean(getLogBlockHeader().get(HeaderMetadataType.IS_PARTIAL));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 54e7f301e6a8..0d69dab14258 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -32,6 +32,7 @@ import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.util.Lazy;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
@@ -41,8 +42,6 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -62,8 +61,9 @@ import static
org.apache.hudi.avro.HoodieAvroWrapperUtils.wrapValueIntoAvro;
/**
* Delete block contains a list of keys to be deleted from scanning the blocks
so far.
*/
+@Slf4j
public class HoodieDeleteBlock extends HoodieLogBlock {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieDeleteBlock.class);
+
/**
* These static builders are added to avoid performance issue in Avro 1.10.
* You can find more details in HoodieAvroUtils, HUDI-3834, and AVRO-3048.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index bc80e279a194..95267b154ba5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -38,6 +38,7 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.inline.InLineFSUtils;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import java.io.ByteArrayOutputStream;
@@ -56,6 +57,7 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* HoodieHFileDataBlock contains a list of records stored inside an HFile
format. It is used with the HFile
* base file format.
*/
+@Slf4j
public class HoodieHFileDataBlock extends HoodieDataBlock {
private final Map<String, String> writerParams;
// This path is used for constructing HFile reader context, which should not
be
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 554b83b3455d..0b703ed7ac91 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -28,9 +28,11 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -55,8 +57,11 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Abstract class defining a block in HoodieLogFile.
*/
+@AllArgsConstructor
+@Getter
+@Slf4j
public abstract class HoodieLogBlock {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieLogBlock.class);
+
/**
* The current version of the log block. Anytime the logBlock format changes
this version needs to be bumped and
* corresponding changes need to be made to {@link HoodieLogBlockVersion}
TODO : Change this to a class, something
@@ -65,32 +70,24 @@ public abstract class HoodieLogBlock {
*/
public static int version = 3;
// Header for each log block
+ @Nonnull
private final Map<HeaderMetadataType, String> logBlockHeader;
// Footer for each log block
+ @Nonnull
private final Map<FooterMetadataType, String> logBlockFooter;
// Location of a log block on disk
+ @Nonnull
private final Option<HoodieLogBlockContentLocation> blockContentLocation;
// data for a specific block
+ @Nonnull
private Option<byte[]> content;
+ @Getter(AccessLevel.PROTECTED)
+ @Nullable
private final Supplier<SeekableDataInputStream> inputStreamSupplier;
// Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory
intensive)
+ @Getter(AccessLevel.NONE)
protected boolean readBlockLazily;
- public HoodieLogBlock(
- @Nonnull Map<HeaderMetadataType, String> logBlockHeader,
- @Nonnull Map<FooterMetadataType, String> logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
- @Nonnull Option<byte[]> content,
- @Nullable Supplier<SeekableDataInputStream> inputStreamSupplier,
- boolean readBlockLazily) {
- this.logBlockHeader = logBlockHeader;
- this.logBlockFooter = logBlockFooter;
- this.blockContentLocation = blockContentLocation;
- this.content = content;
- this.inputStreamSupplier = inputStreamSupplier;
- this.readBlockLazily = readBlockLazily;
- }
-
// Return the bytes representation of the data belonging to a LogBlock
public ByteArrayOutputStream getContentBytes(HoodieStorage storage) throws
IOException {
throw new HoodieException("No implementation was provided");
@@ -110,22 +107,6 @@ public abstract class HoodieLogBlock {
throw new HoodieException("No implementation was provided");
}
- public Option<HoodieLogBlockContentLocation> getBlockContentLocation() {
- return this.blockContentLocation;
- }
-
- public Map<HeaderMetadataType, String> getLogBlockHeader() {
- return logBlockHeader;
- }
-
- public Map<FooterMetadataType, String> getLogBlockFooter() {
- return logBlockFooter;
- }
-
- public Option<byte[]> getContent() {
- return content;
- }
-
/**
* Compacted blocks are created using log compaction which basically merges
the consecutive blocks together and create
* huge block with all the changes.
@@ -161,10 +142,10 @@ public abstract class HoodieLogBlock {
try {
logBlockHeader.put(HeaderMetadataType.RECORD_POSITIONS,
LogReaderUtils.encodePositions(positionSet));
} catch (IOException e) {
- LOG.error("Cannot write record positions to the log block header.", e);
+ log.error("Cannot write record positions to the log block header.", e);
}
} else {
- LOG.warn("There are duplicate keys in the records (number of unique
positions: {}, "
+ log.warn("There are duplicate keys in the records (number of unique
positions: {}, "
+ "number of records: {}). Skip writing record positions to the
log block header.",
positionSet.size(), numRecords);
}
@@ -176,7 +157,7 @@ public abstract class HoodieLogBlock {
}
protected void removeBaseFileInstantTimeOfPositions() {
- LOG.info("There are records without valid positions. "
+ log.info("There are records without valid positions. "
+ "Skip writing record positions to the block header.");
logBlockHeader.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
}
@@ -252,7 +233,10 @@ public abstract class HoodieLogBlock {
* This class is used to store the Location of the Content of a Log Block.
It's used when a client chooses for a IO
* intensive CompactedScanner, the location helps to lazily read contents
from the log file
*/
+ @AllArgsConstructor
+ @Getter
public static final class HoodieLogBlockContentLocation {
+
// Storage Config required to access the file
private final HoodieStorage storage;
// The logFile that contains this block
@@ -263,38 +247,6 @@ public abstract class HoodieLogBlock {
private final long blockSize;
// The final position where the complete block ends
private final long blockEndPos;
-
- public HoodieLogBlockContentLocation(HoodieStorage storage,
- HoodieLogFile logFile,
- long contentPositionInLogFile,
- long blockSize,
- long blockEndPos) {
- this.storage = storage;
- this.logFile = logFile;
- this.contentPositionInLogFile = contentPositionInLogFile;
- this.blockSize = blockSize;
- this.blockEndPos = blockEndPos;
- }
-
- public HoodieStorage getStorage() {
- return storage;
- }
-
- public HoodieLogFile getLogFile() {
- return logFile;
- }
-
- public long getContentPositionInLogFile() {
- return contentPositionInLogFile;
- }
-
- public long getBlockSize() {
- return blockSize;
- }
-
- public long getBlockEndPos() {
- return blockEndPos;
- }
}
/**
@@ -359,10 +311,6 @@ public abstract class HoodieLogBlock {
return Option.of(baos);
}
- protected Supplier<SeekableDataInputStream> getInputStreamSupplier() {
- return inputStreamSupplier;
- }
-
/**
* Adds the record positions if the base file instant time of the positions
exists
* in the log header and the record positions are all valid.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index d68e608463df..fb78adaa772c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -23,6 +23,10 @@ import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.OrderingValues;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -34,41 +38,22 @@ import java.util.function.UnaryOperator;
*
* @param <T> The type of the engine specific row.
*/
+@AllArgsConstructor
+@Getter
public class BufferedRecord<T> implements Serializable {
+
private String recordKey;
- private T record;
private final Comparable orderingValue;
+ private T record;
private final Integer schemaId;
- @Nullable private HoodieOperation hoodieOperation;
+ @Nullable
+ @Setter
+ private HoodieOperation hoodieOperation;
public BufferedRecord() {
this(null, null, null, null, null);
}
- public BufferedRecord(String recordKey, Comparable orderingValue, T record,
Integer schemaId, @Nullable HoodieOperation hoodieOperation) {
- this.recordKey = recordKey;
- this.orderingValue = orderingValue;
- this.record = record;
- this.schemaId = schemaId;
- this.hoodieOperation = hoodieOperation;
- }
-
- public String getRecordKey() {
- return recordKey;
- }
-
- public Comparable getOrderingValue() {
- return orderingValue;
- }
-
- public T getRecord() {
- return record;
- }
-
- public Integer getSchemaId() {
- return schemaId;
- }
-
public boolean isDelete() {
return HoodieOperation.isDelete(hoodieOperation) ||
HoodieOperation.isUpdateBefore(hoodieOperation);
}
@@ -81,14 +66,6 @@ public class BufferedRecord<T> implements Serializable {
return isDelete() && OrderingValues.isDefault(orderingValue);
}
- public void setHoodieOperation(HoodieOperation hoodieOperation) {
- this.hoodieOperation = hoodieOperation;
- }
-
- public HoodieOperation getHoodieOperation() {
- return this.hoodieOperation;
- }
-
public BufferedRecord<T> toBinary(RecordContext<T> recordContext) {
if (record != null) {
HoodieSchema schema = recordContext.getSchemaFromBufferRecord(this);
@@ -124,6 +101,8 @@ public class BufferedRecord<T> implements Serializable {
return this;
}
+ // Intentionally not using @EqualsAndHashCode: Lombok generates
instanceof/canEqual based equality,
+ // while this class requires exact runtime-class equality via getClass()
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 81d2de0ac0a2..06e903dd1fa4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -33,16 +33,17 @@ import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
import java.io.IOException;
/**
* Factory to create a {@link BufferedRecordMerger}.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class BufferedRecordMergerFactory {
- private BufferedRecordMergerFactory() {
- }
-
public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T>
readerContext,
RecordMergeMode
recordMergeMode,
boolean
enablePartialMerging,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
index ce6857dc2aeb..f1bd8b0b0a8a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
@@ -27,6 +27,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
import java.io.Serializable;
import java.util.Properties;
@@ -37,10 +40,13 @@ import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
/**
* Schema context for deletes.
*/
+@Getter
public class DeleteContext implements Serializable {
+
private static final long serialVersionUID = 1L;
private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
+ @Accessors(fluent = true)
private final boolean hasBuiltInDeleteField;
private int hoodieOperationPos;
private HoodieSchema readerSchema;
@@ -108,25 +114,9 @@ public class DeleteContext implements Serializable {
.orElseGet(() -> -1);
}
- public Option<Pair<String, String>> getCustomDeleteMarkerKeyValue() {
- return customDeleteMarkerKeyValue;
- }
-
- public boolean hasBuiltInDeleteField() {
- return hasBuiltInDeleteField;
- }
-
- public int getHoodieOperationPos() {
- return hoodieOperationPos;
- }
-
public DeleteContext withReaderSchema(HoodieSchema readerSchema) {
this.readerSchema = readerSchema;
this.hoodieOperationPos = getHoodieOperationPos(readerSchema);
return this;
}
-
- public HoodieSchema getReaderSchema() {
- return readerSchema;
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index 01f3e513e341..da0bfb567abe 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -32,8 +32,11 @@ import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@@ -108,11 +111,13 @@ import java.util.stream.Stream;
*
* <p>IMPORTANT: the reader may optionally choose to fall back to reading the
latest snapshot if there are files decoding the commit metadata are already
cleaned.
*/
+@Slf4j
public class IncrementalQueryAnalyzer {
+
public static final String START_COMMIT_EARLIEST = "earliest";
- private static final Logger LOG =
LoggerFactory.getLogger(IncrementalQueryAnalyzer.class);
private final HoodieTableMetaClient metaClient;
+ @Getter
private final Option<String> startCompletionTime;
private final Option<String> endCompletionTime;
private final InstantRange.RangeType rangeType;
@@ -143,10 +148,6 @@ public class IncrementalQueryAnalyzer {
this.limit = limit;
}
- public Option<String> getStartCompletionTime() {
- return startCompletionTime;
- }
-
/**
* Returns a builder.
*/
@@ -206,7 +207,7 @@ public class IncrementalQueryAnalyzer {
String endInstant = endCompletionTime.isEmpty() ? null : lastInstant;
return QueryContext.create(startInstant, endInstant, instants,
archivedInstants, activeInstants, filteredTimeline, archivedReadTimeline);
} catch (Exception ex) {
- LOG.error("Got exception when generating incremental query info", ex);
+ log.error("Got exception when generating incremental query info", ex);
throw new HoodieException(ex);
}
}
@@ -284,6 +285,7 @@ public class IncrementalQueryAnalyzer {
/**
* Builder for {@link IncrementalQueryAnalyzer}.
*/
+ @NoArgsConstructor
public static class Builder {
/**
* Start completion time.
@@ -304,9 +306,6 @@ public class IncrementalQueryAnalyzer {
*/
private int limit = -1;
- public Builder() {
- }
-
public Builder startCompletionTime(String startCompletionTime) {
this.startCompletionTime = startCompletionTime;
return this;
@@ -361,9 +360,12 @@ public class IncrementalQueryAnalyzer {
/**
* Represents the analyzed query context.
*/
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @Getter
public static class QueryContext {
+
public static final QueryContext EMPTY =
- new QueryContext(null, null, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), null, null);
+ new QueryContext(Option.empty(), Option.empty(),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
null, null);
/**
* An empty option indicates consumption from the earliest instant.
@@ -373,6 +375,8 @@ public class IncrementalQueryAnalyzer {
* An empty option indicates consumption to the latest instant.
*/
private final Option<String> endInstant;
+ @Getter(AccessLevel.NONE)
+ private final List<String> instants;
private final List<HoodieInstant> archivedInstants;
private final List<HoodieInstant> activeInstants;
/**
@@ -382,25 +386,9 @@ public class IncrementalQueryAnalyzer {
/**
* The archived timeline to read filtered by given configurations.
*/
+ @Nullable
+ @Getter(AccessLevel.NONE)
private final HoodieTimeline archivedTimeline;
- private final List<String> instants;
-
- private QueryContext(
- @Nullable String startInstant,
- @Nullable String endInstant,
- List<String> instants,
- List<HoodieInstant> archivedInstants,
- List<HoodieInstant> activeInstants,
- HoodieTimeline activeTimeline,
- @Nullable HoodieTimeline archivedTimeline) {
- this.startInstant = Option.ofNullable(startInstant);
- this.endInstant = Option.ofNullable(endInstant);
- this.archivedInstants = archivedInstants;
- this.activeInstants = activeInstants;
- this.activeTimeline = activeTimeline;
- this.archivedTimeline = archivedTimeline;
- this.instants = instants;
- }
public static QueryContext create(
@Nullable String startInstant,
@@ -410,7 +398,7 @@ public class IncrementalQueryAnalyzer {
List<HoodieInstant> activeInstants,
HoodieTimeline activeTimeline,
@Nullable HoodieTimeline archivedTimeline) {
- return new QueryContext(startInstant, endInstant, instants,
archivedInstants, activeInstants, activeTimeline, archivedTimeline);
+ return new QueryContext(Option.ofNullable(startInstant),
Option.ofNullable(endInstant), instants, archivedInstants, activeInstants,
activeTimeline, archivedTimeline);
}
public boolean isEmpty() {
@@ -421,14 +409,6 @@ public class IncrementalQueryAnalyzer {
return this.instants;
}
- public Option<String> getStartInstant() {
- return startInstant;
- }
-
- public Option<String> getEndInstant() {
- return endInstant;
- }
-
/**
* Returns the latest instant time which should be included physically in
reading.
*/
@@ -441,14 +421,6 @@ public class IncrementalQueryAnalyzer {
return Stream.concat(archivedInstants.stream(),
activeInstants.stream()).collect(Collectors.toList());
}
- public List<HoodieInstant> getArchivedInstants() {
- return archivedInstants;
- }
-
- public List<HoodieInstant> getActiveInstants() {
- return activeInstants;
- }
-
public boolean isConsumingFromEarliest() {
return startInstant.isEmpty();
}
@@ -489,10 +461,6 @@ public class IncrementalQueryAnalyzer {
}
}
- public HoodieTimeline getActiveTimeline() {
- return this.activeTimeline;
- }
-
public @Nullable HoodieTimeline getArchivedTimeline() {
return archivedTimeline;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index e84921c080ae..e8ff9add0ad8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -33,6 +33,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
import java.util.List;
/**
@@ -40,16 +43,15 @@ import java.util.List;
*
* @param <T> the engine specific record type
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoader implements FileGroupRecordBufferLoader<T> {
+
private static final DefaultFileGroupRecordBufferLoader INSTANCE = new
DefaultFileGroupRecordBufferLoader<>();
static <T> DefaultFileGroupRecordBufferLoader<T> getInstance() {
return INSTANCE;
}
- private DefaultFileGroupRecordBufferLoader() {
- }
-
@Override
public Pair<HoodieFileGroupRecordBuffer<T>, List<String>>
getRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieStorage storage,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index 30f872421057..8bd1013f6643 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -50,6 +50,9 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
import org.apache.hudi.io.util.FileIOUtils;
+import lombok.Getter;
+import lombok.Setter;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
@@ -77,8 +80,10 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
protected final Option<Pair<String, String>> payloadClasses;
protected final TypedProperties props;
protected final ExternalSpillableMap<Serializable, BufferedRecord<T>>
records;
+ @Getter
protected final DeleteContext deleteContext;
protected final BufferedRecordConverter<T> bufferedRecordConverter;
+ @Setter
protected ClosableIterator<T> baseFileIterator;
protected UpdateProcessor<T> updateProcessor;
protected Iterator<BufferedRecord<T>> logRecordIterator;
@@ -87,6 +92,7 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
protected InternalSchema internalSchema;
protected HoodieTableMetaClient hoodieTableMetaClient;
protected BufferedRecordMerger<T> bufferedRecordMerger;
+ @Getter
protected long totalLogRecords = 0;
protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
@@ -131,15 +137,6 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
readerContext.getRecordSizeEstimator(), diskMapType,
readerContext.getRecordSerializer(), isBitCaskDiskMapCompressionEnabled,
getClass().getSimpleName());
}
- @Override
- public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
- this.baseFileIterator = baseFileIterator;
- }
-
- public DeleteContext getDeleteContext() {
- return deleteContext;
- }
-
/**
* This allows hasNext() to be called multiple times without incrementing
the iterator by more than 1
* record. It does come with the caveat that hasNext() must be called every
time before next(). But
@@ -169,10 +166,6 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
return records.size();
}
- public long getTotalLogRecords() {
- return totalLogRecords;
- }
-
@Override
public ClosableIterator<BufferedRecord<T>> getLogRecordIterator() {
return new LogRecordIterator<>(this);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
index 73dd5dda5e87..5eaa54e033c8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
@@ -41,9 +41,8 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
+import lombok.extern.slf4j.Slf4j;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -61,8 +60,8 @@ import java.util.function.Function;
* Here the position means that record position in the base file. The records
from the base file is accessed from an iterator object. These records are
merged when the
* {@link #hasNext} method is called.
*/
+@Slf4j
public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupRecordBuffer<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(PositionBasedFileGroupRecordBuffer.class);
private static final String ROW_INDEX_COLUMN_NAME = "row_index";
public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME =
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
@@ -96,7 +95,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
// Extract positions from data block.
List<Long> recordPositions = extractRecordPositions(dataBlock,
baseFileInstantTime);
if (recordPositions == null) {
- LOG.debug("Falling back to key based merge for data block");
+ log.debug("Falling back to key based merge for data block");
fallbackToKeyBasedBuffer();
super.processDataBlock(dataBlock, keySpecOpt);
return;
@@ -181,7 +180,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
List<Long> recordPositions = extractRecordPositions(deleteBlock,
baseFileInstantTime);
if (recordPositions == null) {
- LOG.debug("Falling back to key based merging for delete block");
+ log.debug("Falling back to key based merging for delete block");
fallbackToKeyBasedBuffer();
super.processDeleteBlock(deleteBlock);
return;
@@ -291,7 +290,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
String blockBaseFileInstantTime =
logBlock.getBaseFileInstantTimeOfPositions();
if (StringUtils.isNullOrEmpty(blockBaseFileInstantTime) ||
!baseFileInstantTime.equals(blockBaseFileInstantTime)) {
- LOG.debug("The record positions cannot be used because the base file
instant time "
+ log.debug("The record positions cannot be used because the base file
instant time "
+ "is either missing or different from the base file to merge. "
+ "Instant time in the header: {}, base file instant time of the
file group: {}.",
blockBaseFileInstantTime, baseFileInstantTime);
@@ -299,7 +298,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
}
Roaring64NavigableMap positions = logBlock.getRecordPositions();
if (positions == null || positions.isEmpty()) {
- LOG.info("No record position info is found when attempting to do
position based merge.");
+ log.info("No record position info is found when attempting to do
position based merge.");
return null;
}
@@ -309,7 +308,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
}
if (blockPositions.isEmpty()) {
- LOG.info("No positions are extracted.");
+ log.info("No positions are extracted.");
return null;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 7f6771ba3940..57b0a30c4cb1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -103,13 +103,13 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
out.writeBoolean(false);
}
- out.writeBoolean(instantRange.getStartInstant().isPresent());
- if (instantRange.getStartInstant().isPresent()) {
- out.writeUTF(instantRange.getStartInstant().get());
+ out.writeBoolean(instantRange.getStartInstantOpt().isPresent());
+ if (instantRange.getStartInstantOpt().isPresent()) {
+ out.writeUTF(instantRange.getStartInstantOpt().get());
}
- out.writeBoolean(instantRange.getEndInstant().isPresent());
- if (instantRange.getEndInstant().isPresent()) {
- out.writeUTF(instantRange.getEndInstant().get());
+ out.writeBoolean(instantRange.getEndInstantOpt().isPresent());
+ if (instantRange.getEndInstantOpt().isPresent()) {
+ out.writeUTF(instantRange.getEndInstantOpt().get());
}
if
(instantRange.getRangeType().equals(InstantRange.RangeType.EXACT_MATCH)) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index f509013f086b..472c8d74c3c8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -438,11 +438,11 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
IncrementalInputSplits.Result result = iis.inputSplits(metaClient,
firstInstant.getCompletionTime(), false);
String minStartCommit = result.getInputSplits().stream()
- .map(split ->
split.getInstantRange().get().getStartInstant().get())
+ .map(split ->
split.getInstantRange().get().getStartInstantOpt().get())
.min((commit1,commit2) -> compareTimestamps(commit1, LESSER_THAN,
commit2) ? 1 : 0)
.orElse(null);
String maxEndCommit = result.getInputSplits().stream()
- .map(split -> split.getInstantRange().get().getEndInstant().get())
+ .map(split ->
split.getInstantRange().get().getEndInstantOpt().get())
.max((commit1,commit2) -> compareTimestamps(commit1, GREATER_THAN,
commit2) ? 1 : 0)
.orElse(null);
assertEquals(0, intervalBetween2Instants(commitsTimeline, minStartCommit,
maxEndCommit), "Should read 1 instant");
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index ec6f3b863caf..bb28ee92e2df 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -505,8 +505,8 @@ public class TestStreamReadMonitoringFunction {
private static boolean isPointInstantRange(InstantRange instantRange, String
timestamp) {
return instantRange != null
- && Objects.equals(timestamp, instantRange.getStartInstant().get())
- && Objects.equals(timestamp, instantRange.getEndInstant().get());
+ && Objects.equals(timestamp, instantRange.getStartInstantOpt().get())
+ && Objects.equals(timestamp, instantRange.getEndInstantOpt().get());
}
private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit>
createHarness(
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
index 1cb244245ce8..8dd32af3adf3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
@@ -360,8 +360,8 @@ public class TestHoodieSourceSplit {
);
assertTrue(split.getInstantRange().isPresent());
- assertEquals("20230101000000000",
split.getInstantRange().get().getStartInstant().get());
- assertEquals("20230131235959999",
split.getInstantRange().get().getEndInstant().get());
+ assertEquals("20230101000000000",
split.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230131235959999",
split.getInstantRange().get().getEndInstantOpt().get());
}
@Test
@@ -402,9 +402,9 @@ public class TestHoodieSourceSplit {
);
assertTrue(split.getInstantRange().isPresent());
- assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
- assertFalse(split.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230101000000000",
split.getInstantRange().get().getStartInstant().get());
+ assertTrue(split.getInstantRange().get().getStartInstantOpt().isPresent());
+ assertFalse(split.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230101000000000",
split.getInstantRange().get().getStartInstantOpt().get());
}
@Test
@@ -476,7 +476,7 @@ public class TestHoodieSourceSplit {
);
assertTrue(split.getInstantRange().isPresent());
- assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
- assertTrue(split.getInstantRange().get().getEndInstant().isPresent());
+ assertTrue(split.getInstantRange().get().getStartInstantOpt().isPresent());
+ assertTrue(split.getInstantRange().get().getEndInstantOpt().isPresent());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index 523606bb5f76..d56d18648c29 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -508,10 +508,10 @@ public class TestHoodieSourceSplitSerializer {
assertNotNull(deserialized);
assertTrue(deserialized.getInstantRange().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
- assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstantOpt().get());
}
@Test
@@ -539,9 +539,9 @@ public class TestHoodieSourceSplitSerializer {
assertNotNull(deserialized);
assertTrue(deserialized.getInstantRange().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-
assertFalse(deserialized.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
+
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertFalse(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstantOpt().get());
}
@Test
@@ -569,10 +569,10 @@ public class TestHoodieSourceSplitSerializer {
assertNotNull(deserialized);
assertTrue(deserialized.getInstantRange().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
- assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstantOpt().get());
}
@Test
@@ -604,8 +604,8 @@ public class TestHoodieSourceSplitSerializer {
assertTrue(deserialized.getInstantRange().isPresent());
assertEquals(10, deserialized.getFileOffset());
assertEquals(500L, deserialized.getConsumed());
- assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
- assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstantOpt().get());
}
@Test
@@ -636,13 +636,13 @@ public class TestHoodieSourceSplitSerializer {
// Verify split1
assertTrue(deserialized1.getInstantRange().isPresent());
- assertEquals("20230101000000000",
deserialized1.getInstantRange().get().getStartInstant().get());
- assertEquals("20230131235959999",
deserialized1.getInstantRange().get().getEndInstant().get());
+ assertEquals("20230101000000000",
deserialized1.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230131235959999",
deserialized1.getInstantRange().get().getEndInstantOpt().get());
// Verify split2
assertTrue(deserialized2.getInstantRange().isPresent());
- assertEquals("20230201000000000",
deserialized2.getInstantRange().get().getStartInstant().get());
-
assertFalse(deserialized2.getInstantRange().get().getEndInstant().isPresent());
+ assertEquals("20230201000000000",
deserialized2.getInstantRange().get().getStartInstantOpt().get());
+
assertFalse(deserialized2.getInstantRange().get().getEndInstantOpt().isPresent());
// Verify split3
assertFalse(deserialized3.getInstantRange().isPresent());
@@ -862,15 +862,15 @@ public class TestHoodieSourceSplitSerializer {
assertTrue(deserialized2.getInstantRange().isPresent());
assertEquals(InstantRange.RangeType.OPEN_CLOSED,
deserialized2.getInstantRange().get().getRangeType());
- assertEquals("20230201000000000",
deserialized2.getInstantRange().get().getStartInstant().get());
- assertEquals("20230228235959999",
deserialized2.getInstantRange().get().getEndInstant().get());
+ assertEquals("20230201000000000",
deserialized2.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230228235959999",
deserialized2.getInstantRange().get().getEndInstantOpt().get());
// Verify split3 (CLOSED_CLOSED)
assertTrue(deserialized3.getInstantRange().isPresent());
assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
deserialized3.getInstantRange().get().getRangeType());
- assertEquals("20230301000000000",
deserialized3.getInstantRange().get().getStartInstant().get());
- assertEquals("20230331235959999",
deserialized3.getInstantRange().get().getEndInstant().get());
+ assertEquals("20230301000000000",
deserialized3.getInstantRange().get().getStartInstantOpt().get());
+ assertEquals("20230331235959999",
deserialized3.getInstantRange().get().getEndInstantOpt().get());
}
@Test
@@ -953,9 +953,9 @@ public class TestHoodieSourceSplitSerializer {
assertTrue(deserialized.getInstantRange().isPresent());
assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
deserialized.getInstantRange().get().getRangeType());
-
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-
assertFalse(deserialized.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
+
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertFalse(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstantOpt().get());
// Verify range behavior - start is inclusive, no end boundary
assertTrue(deserialized.getInstantRange().get().isInRange("20230101000000000"));
// start inclusive
@@ -991,9 +991,9 @@ public class TestHoodieSourceSplitSerializer {
assertTrue(deserialized.getInstantRange().isPresent());
assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
deserialized.getInstantRange().get().getRangeType());
-
assertFalse(deserialized.getInstantRange().get().getStartInstant().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+
assertFalse(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstantOpt().get());
// Verify range behavior - no start boundary, end is inclusive
assertTrue(deserialized.getInstantRange().get().isInRange("19700101000000000"));
@@ -1028,9 +1028,9 @@ public class TestHoodieSourceSplitSerializer {
assertTrue(deserialized.getInstantRange().isPresent());
assertEquals(InstantRange.RangeType.OPEN_CLOSED,
deserialized.getInstantRange().get().getRangeType());
-
assertFalse(deserialized.getInstantRange().get().getStartInstant().isPresent());
-
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
- assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+
assertFalse(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstantOpt().get());
// Verify range behavior - no start boundary, end is inclusive
assertTrue(deserialized.getInstantRange().get().isInRange("19700101000000000"));
@@ -1086,23 +1086,23 @@ public class TestHoodieSourceSplitSerializer {
HoodieSourceSplit deserialized4 =
serializer.deserialize(serializer.getVersion(), serialized4);
// Verify OPEN_CLOSED with only start
-
assertTrue(deserialized1.getInstantRange().get().getStartInstant().isPresent());
-
assertFalse(deserialized1.getInstantRange().get().getEndInstant().isPresent());
+
assertTrue(deserialized1.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertFalse(deserialized1.getInstantRange().get().getEndInstantOpt().isPresent());
assertEquals(InstantRange.RangeType.OPEN_CLOSED,
deserialized1.getInstantRange().get().getRangeType());
// Verify OPEN_CLOSED with only end
-
assertFalse(deserialized2.getInstantRange().get().getStartInstant().isPresent());
-
assertTrue(deserialized2.getInstantRange().get().getEndInstant().isPresent());
+
assertFalse(deserialized2.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertTrue(deserialized2.getInstantRange().get().getEndInstantOpt().isPresent());
assertEquals(InstantRange.RangeType.OPEN_CLOSED,
deserialized2.getInstantRange().get().getRangeType());
// Verify CLOSED_CLOSED with only start
-
assertTrue(deserialized3.getInstantRange().get().getStartInstant().isPresent());
-
assertFalse(deserialized3.getInstantRange().get().getEndInstant().isPresent());
+
assertTrue(deserialized3.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertFalse(deserialized3.getInstantRange().get().getEndInstantOpt().isPresent());
assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
deserialized3.getInstantRange().get().getRangeType());
// Verify CLOSED_CLOSED with only end
-
assertFalse(deserialized4.getInstantRange().get().getStartInstant().isPresent());
-
assertTrue(deserialized4.getInstantRange().get().getEndInstant().isPresent());
+
assertFalse(deserialized4.getInstantRange().get().getStartInstantOpt().isPresent());
+
assertTrue(deserialized4.getInstantRange().get().getEndInstantOpt().isPresent());
assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
deserialized4.getInstantRange().get().getRangeType());
}