This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch IGNITE-14353 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit ce391b6ea06fd6f5e93ca3904f703850a50ac793 Author: Nikolay Izhikov <[email protected]> AuthorDate: Tue Mar 23 20:04:05 2021 +0300 IGNITE-13581 Usage of built-in WalIterator filter --- .../src/main/java/org/apache/ignite/internal/cdc/IgniteCDC.java | 4 +++- .../java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java | 6 +----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/IgniteCDC.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/IgniteCDC.java index af4d785..31b363f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/IgniteCDC.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/IgniteCDC.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CDC_PATH; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.NODE_PATTERN; import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.UUID_STR_PATTERN; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; @@ -309,7 +310,8 @@ public class IgniteCDC implements Runnable { .binaryMetadataFileStoreDir(binaryMeta) .marshallerMappingFileStoreDir(marshaller) .keepBinary(keepBinary) - .filesOrDirs(segment.toFile()); + .filesOrDirs(segment.toFile()) + .addFilter((type, ptr) -> type == DATA_RECORD_V2); if (initState != null) { long segmentIdx = segmentIndex(segment); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java index abe6b0a..f5e62df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; -import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -56,9 +55,6 @@ public class WALRecordsConsumer<K, V> { /** Operations types we interested in. */ private static final EnumSet<GridCacheOperation> OPS_TYPES = EnumSet.of(CREATE, UPDATE, DELETE, TRANSFORM); - /** WAL Records filter. */ - private static final IgnitePredicate<WALRecord> DATA_REC_FILTER = r -> r.type() == DATA_RECORD_V2; - /** Operations filter. */ private static final IgnitePredicate<? super DataEntry> OPS_FILTER = e -> { if (!(e instanceof UnwrappedDataEntry)) @@ -129,7 +125,7 @@ public class WALRecordsConsumer<K, V> { e.cacheId(), e.expireTime() ); - }, true, OPS_FILTER), true, DATA_REC_FILTER))); + }, true, OPS_FILTER), true))); } /**
