This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b041e2f32fa IGNITE-16757 Introduce cache change events for CDCCosumer
(#9948)
b041e2f32fa is described below
commit b041e2f32fabf2d06a224259a4c2647bbb8156b8
Author: Nikolay <[email protected]>
AuthorDate: Thu Jul 7 12:05:38 2022 +0300
IGNITE-16757 Introduce cache change events for CDCCosumer (#9948)
---
.../java/org/apache/ignite/cdc/CdcCacheEvent.java | 62 +++++
.../java/org/apache/ignite/cdc/CdcConsumer.java | 34 ++-
.../ignite/internal/cdc/CdcConsumerState.java | 37 +++
.../org/apache/ignite/internal/cdc/CdcMain.java | 114 ++++++++-
.../ignite/internal/cdc/WalRecordsConsumer.java | 19 ++
.../pagemem/store/IgnitePageStoreManager.java | 10 -
.../internal/processors/cache/CachesRegistry.java | 21 +-
.../processors/cache/GridCacheProcessor.java | 4 +-
.../internal/processors/cache/GridCacheUtils.java | 16 ++
.../processors/cache/GridLocalConfigManager.java | 142 ++++++----
.../internal/processors/cache/StoredCacheData.java | 16 +-
.../IgniteCacheDatabaseSharedManager.java | 15 +-
.../persistence/file/FilePageStoreManager.java | 11 +-
.../org/apache/ignite/cdc/AbstractCdcTest.java | 73 +++++-
.../ignite/cdc/CdcCacheConfigOnRestartTest.java | 185 +++++++++++++
.../java/org/apache/ignite/cdc/CdcSelfTest.java | 17 ++
.../persistence/pagemem/NoOpPageStoreManager.java | 5 -
.../ignite/testsuites/IgnitePdsTestSuite2.java | 2 +
.../ignite/internal/cdc/CacheEventsCdcTest.java | 285 +++++++++++++++++++++
.../org/apache/ignite/internal/cdc/SqlCdcTest.java | 109 ++++++--
.../IgniteBinaryCacheQueryTestSuite3.java | 4 +-
.../apache/ignite/cdc/CdcConfigurationTest.java | 10 +
22 files changed, 1063 insertions(+), 128 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/cdc/CdcCacheEvent.java
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcCacheEvent.java
new file mode 100644
index 00000000000..d006c7b18ea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcCacheEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.spi.systemview.view.CacheView;
+
+/**
+ * Notification of {@link CdcConsumer} about cache creation/change events.
+ *
+ * @see CdcConsumer
+ * @see Ignite#createCache(String)
+ * @see IgniteCache
+ * @see CacheConfiguration
+ * @see QueryEntity
+ */
+@IgniteExperimental
+public interface CdcCacheEvent {
+ /**
+ * @return Cache ID.
+ * @see CacheView#cacheId()
+ */
+ public int cacheId();
+
+ /**
+ * Note, {@link CacheConfiguration#getQueryEntities()} value not changed
on table schema change.
+ * Current table schema can be obtained by {@link #queryEntities()}.
+ *
+ * @return Initial cache configuration.
+ */
+ public CacheConfiguration<?, ?> configuration();
+
+ /**
+ * Returns current state of configured {@link QueryEntity}.
+ * {@link QueryEntity} can be changed by executing DDL on SQL tables.
+ *
+ * Note, {@link CacheConfiguration#getQueryEntities()} returns initial
definition of {@link QueryEntity}.
+ *
+ * @return Query entities for cache.
+ */
+ public Collection<QueryEntity> queryEntities();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
index c95bc5ed12c..8e14dfd1e5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cdc;
import java.util.Iterator;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryIdMapper;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.systemview.view.CacheView;
/**
* Consumer of WAL data change events.
@@ -50,6 +52,14 @@ import org.apache.ignite.resources.LoggerResource;
*
* Note, consumption of the {@link CdcEvent} will be started from the last
saved offset.
* The offset of consumptions is saved on the disk every time {@link
#onEvents(Iterator)} returns {@code true}.
+ * Note, order of notifications are following:
+ * <ul>
+ * <li>{@link #onMappings(Iterator)}</li>
+ * <li>{@link #onTypes(Iterator)}</li>
+ * <li>{@link #onCacheChange(Iterator)}</li>
+ * <li>{@link #onCacheDestroy(Iterator)}</li>
+ * </ul>
+ * Note, {@link CdcConsumer} receive notifications on each running CDC
application(node).
*
* @see CdcMain
* @see CdcEvent
@@ -77,7 +87,6 @@ public interface CdcConsumer {
/**
* Handles new binary types. State of the types processing will be stored
after method invocation
* and ongoing notifications after CDC application fail/restart will be
continued for newly created/updates types.
- * Invoked before {@link #onEvents(Iterator)}.
*
* Note, unlike {@link #onEvents(Iterator)} this method MUST process all
types or CDC will fail.
* Because, in time of invocation {@link #onEvents(Iterator)} all changed
types must be available on destionation.
@@ -94,7 +103,6 @@ public interface CdcConsumer {
/**
* Handles new mappings from type name to id. State of the types
processing will be stored after method invocation
* and ongoing notifications after CDC application fail/restart will be
continued for newly created/updates mappings.
- * Invoked before both {@link #onEvents(Iterator)} and {@link
#onTypes(Iterator)}.
*
* @param mappings Binary mapping iterator.
* @see IgniteBinary
@@ -103,6 +111,28 @@ public interface CdcConsumer {
*/
public void onMappings(Iterator<TypeMapping> mappings);
+ /**
+ * Handles caches changes(create, edit) events. State of cache processing
will be stored after method invocation
+ * and ongoing notifications after CDC application fail/restart will be
continued for newly changed caches.
+ *
+ * @param cacheEvents Cache change events.
+ * @see Ignite#createCache(String)
+ * @see Ignite#getOrCreateCache(String)
+ * @see CdcCacheEvent
+ */
+ public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents);
+
+ /**
+ * Handles cache destroy events. State of cache processing will be stored
after method invocation
+ * and ongoing notifications after CDC application fail/restart will be
continued for newly changed caches.
+ *
+ * @param caches Destroyed caches.
+ * @see Ignite#destroyCache(String)
+ * @see CdcCacheEvent
+ * @see CacheView#cacheId()
+ */
+ public void onCacheDestroy(Iterator<Integer> caches);
+
/**
* Stops the consumer.
* This method can be invoked only after {@link #start(MetricRegistry)}.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
index e00391cab71..379c2bb6f95 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
@@ -68,6 +68,9 @@ public class CdcConsumerState {
/** */
public static final String MAPPINGS_STATE_FILE_NAME = "cdc-mappings-state"
+ FILE_SUFFIX;
+ /** */
+ public static final String CACHES_STATE_FILE_NAME = "cdc-caches-state" +
FILE_SUFFIX;
+
/** Log. */
private final IgniteLogger log;
@@ -89,6 +92,12 @@ public class CdcConsumerState {
/** Mappings types state file. */
private final Path tmpMappings;
+ /** Cache state file. */
+ private final Path caches;
+
+ /** Mappings types state file. */
+ private final Path tmpCaches;
+
/**
* @param stateDir State directory.
*/
@@ -100,6 +109,8 @@ public class CdcConsumerState {
tmpTypes = stateDir.resolve(TYPES_STATE_FILE_NAME + TMP_SUFFIX);
mappings = stateDir.resolve(MAPPINGS_STATE_FILE_NAME);
tmpMappings = stateDir.resolve(MAPPINGS_STATE_FILE_NAME + TMP_SUFFIX);
+ caches = stateDir.resolve(CACHES_STATE_FILE_NAME);
+ tmpCaches = stateDir.resolve(CACHES_STATE_FILE_NAME + TMP_SUFFIX);
}
/**
@@ -179,6 +190,32 @@ public class CdcConsumerState {
return state;
}
+ /**
+ * Loads CDC caches state from file.
+ *
+ * @return Saved state.
+ */
+ public Map<Integer, Long> loadCaches() {
+ Map<Integer, Long> state = load(caches, HashMap::new);
+
+ log.info("Initial caches state loaded [cachesCnt=" + state.size() +
']');
+
+ if (log.isDebugEnabled()) {
+ for (Map.Entry<Integer, Long> entry : state.entrySet())
+ log.debug("Cache [cacheId=" + entry.getKey() + ",
lastModified=" + entry.getValue() + ']');
+ }
+
+ return state;
+ }
+
+ /**
+ * Saves caches state to file.
+ * @param cachesState State of caches.
+ */
+ public void saveCaches(Map<Integer, Long> cachesState) throws IOException {
+ save(cachesState, tmpCaches, caches);
+ }
+
/**
* Loads types mappings state from file.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index dfbf9a0b712..91abaa7faa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
@@ -49,6 +50,7 @@ import
org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
@@ -73,6 +76,10 @@ import static
org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static
org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
import static
org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -185,6 +192,9 @@ public class CdcMain implements Runnable {
/** Change Data Capture directory. */
private Path cdcDir;
+ /** Database directory. */
+ private File dbDir;
+
/** Binary meta directory. */
private File binaryMeta;
@@ -206,6 +216,9 @@ public class CdcMain implements Runnable {
/** Mappings state. */
private Set<T2<Integer, Byte>> mappingsState;
+ /** Caches state. */
+ private Map<Integer, Long> cachesState;
+
/** Stopped flag. */
private volatile boolean stopped;
@@ -292,6 +305,7 @@ public class CdcMain implements Runnable {
walState = state.loadWalState();
typesState = state.loadTypesState();
mappingsState = state.loadMappingsState();
+ cachesState = state.loadCaches();
if (walState != null) {
committedSegmentIdx.value(walState.get1().index());
@@ -404,7 +418,7 @@ public class CdcMain implements Runnable {
AtomicLong lastSgmnt = new AtomicLong(-1);
while (!stopped) {
- try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
+ try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
Set<Path> exists = new HashSet<>();
cdcFiles
@@ -423,6 +437,9 @@ public class CdcMain implements Runnable {
.forEach(this::consumeSegment); // Consuming segments.
seen.removeIf(p -> !exists.contains(p)); // Clean up seen
set.
+
+ if (lastSgmnt.get() == -1) //Forcefully updating metadata
if no new segments found.
+ updateMetadata();
}
if (!stopped)
@@ -436,11 +453,11 @@ public class CdcMain implements Runnable {
/** Reads all available records from segment. */
private void consumeSegment(Path segment) {
+ updateMetadata();
+
if (log.isInfoEnabled())
log.info("Processing WAL segment [segment=" + segment + ']');
- updateMetadata();
-
IgniteWalIteratorFactory.IteratorParametersBuilder builder =
new IgniteWalIteratorFactory.IteratorParametersBuilder()
.log(log)
@@ -548,17 +565,24 @@ public class CdcMain implements Runnable {
updateTypes();
+ updateCaches();
+
metaUpdate.value(System.currentTimeMillis() - start);
}
/** Search for new or changed {@link BinaryType} and notifies the
consumer. */
private void updateTypes() {
try {
- Iterator<BinaryType> changedTypes = Files.list(binaryMeta.toPath())
+ File[] files = binaryMeta.listFiles();
+
+ if (files == null)
+ return;
+
+ Iterator<BinaryType> changedTypes = Arrays.stream(files)
.filter(p -> p.toString().endsWith(METADATA_FILE_SUFFIX))
- .map(p -> {
- int typeId =
BinaryUtils.typeId(p.getFileName().toString());
- long lastModified = p.toFile().lastModified();
+ .map(f -> {
+ int typeId = BinaryUtils.typeId(f.getName());
+ long lastModified = f.lastModified();
// Filter out files already in `typesState` with the same
last modify date.
if (typesState.containsKey(typeId) && lastModified ==
typesState.get(typeId))
@@ -638,6 +662,74 @@ public class CdcMain implements Runnable {
}
}
+ /** Search for new or changed {@link CdcCacheEvent} and notifies the
consumer. */
+ private void updateCaches() {
+ try {
+ if (!dbDir.exists())
+ return;
+
+ File[] files = dbDir.listFiles();
+
+ if (files == null)
+ return;
+
+ Set<Integer> destroyed = new HashSet<>(cachesState.keySet());
+
+ Iterator<CdcCacheEvent> cacheEvts = Arrays.stream(files)
+ .filter(f -> f.isDirectory() &&
+ (f.getName().startsWith(CACHE_DIR_PREFIX) ||
f.getName().startsWith(CACHE_GRP_DIR_PREFIX)) &&
+ !f.getName().equals(CACHE_DIR_PREFIX + UTILITY_CACHE_NAME))
+ .filter(File::exists)
+ // Cache group directory can contain several cache data files.
+ // See
GridLocalConfigManager#cacheConfigurationFile(CacheConfiguration<?, ?>)
+ .flatMap(cacheDir -> Arrays.stream(cacheDir.listFiles(f ->
f.getName().endsWith(CACHE_DATA_FILENAME))))
+ .map(f -> {
+ try {
+ CdcCacheEvent evt =
GridLocalConfigManager.readCacheData(
+ f,
+
MarshallerUtils.jdkMarshaller(kctx.igniteInstanceName()),
+ igniteCfg
+ );
+
+ destroyed.remove(evt.cacheId());
+
+ Long lastModified0 = cachesState.get(evt.cacheId());
+
+ if (lastModified0 != null && lastModified0 ==
f.lastModified())
+ return null;
+
+ cachesState.put(evt.cacheId(), f.lastModified());
+
+ return evt;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ })
+ .filter(Objects::nonNull)
+ .iterator();
+
+ consumer.onCacheEvents(cacheEvts);
+
+ if (cacheEvts.hasNext())
+ throw new IllegalStateException("Consumer should handle all
cache change events");
+
+ if (!destroyed.isEmpty()) {
+ Iterator<Integer> destroyedIter = destroyed.iterator();
+
+ consumer.onCacheDestroyEvents(destroyedIter);
+
+ if (destroyedIter.hasNext())
+ throw new IllegalStateException("Consumer should handle
all cache destroy events");
+ }
+
+ state.saveCaches(cachesState);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/**
* Try locks Change Data Capture directory.
*
@@ -645,6 +737,13 @@ public class CdcMain implements Runnable {
* @return Lock or null if lock failed.
*/
private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
+ if (!dbStoreDirWithSubdirectory.exists()) {
+ log.warning("DB store directory not exists. Should be created by
Ignite Node " +
+ " [dir=" + dbStoreDirWithSubdirectory + ']');
+
+ return null;
+ }
+
File cdcRoot = new
File(igniteCfg.getDataStorageConfiguration().getCdcWalPath());
if (!cdcRoot.isAbsolute()) {
@@ -671,6 +770,7 @@ public class CdcMain implements Runnable {
}
this.cdcDir = cdcDir;
+ this.dbDir = dbStoreDirWithSubdirectory;
CdcFileLockHolder lock = new CdcFileLockHolder(cdcDir.toString(),
"cdc.lock", log);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 7a3c4a1b517..1a752970bca 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.TypeMapping;
@@ -155,6 +156,24 @@ public class WalRecordsConsumer<K, V> {
consumer.onMappings(mappings);
}
+ /**
+ * Handles new cache events.
+ *
+ * @param cacheEvts Cache events iterator.
+ */
+ public void onCacheEvents(Iterator<CdcCacheEvent> cacheEvts) {
+ consumer.onCacheChange(cacheEvts);
+ }
+
+ /**
+ * Handles destroy cache events.
+ *
+ * @param caches Destroyed cache iterator.
+ */
+ public void onCacheDestroyEvents(Iterator<Integer> caches) {
+ consumer.onCacheDestroy(caches);
+ }
+
/**
* Starts the consumer.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 30f058a46ed..957ff0b58ac 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -208,14 +208,4 @@ public interface IgnitePageStoreManager extends
GridCacheSharedManager, IgniteCh
* @param cleanFiles {@code True} to delete all persisted files related to
particular store.
*/
public void cleanupPageStoreIfMatch(Predicate<Integer> cacheGrpPred,
boolean cleanFiles);
-
- /**
- * Creates and initializes cache work directory retrieved from {@code
cacheCfg}.
- *
- * @param cacheCfg Cache configuration.
- * @return {@code True} if work directory already exists.
- *
- * @throws IgniteCheckedException If failed.
- */
- public boolean checkAndInitCacheWorkDir(CacheConfiguration cacheCfg)
throws IgniteCheckedException;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index 31aa0ac4693..65868e2a94c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -26,10 +26,10 @@ import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -249,7 +249,7 @@ public class CachesRegistry {
registerCache(cacheDesc);
List<DynamicCacheDescriptor> cachesToPersist =
cacheDescriptors.stream()
- .filter(cacheDesc -> shouldPersist(cacheDesc.cacheConfiguration()))
+ .filter(cacheDesc -> CU.storeCacheConfig(cctx,
cacheDesc.cacheConfiguration()))
.collect(Collectors.toList());
if (cachesToPersist.isEmpty())
@@ -262,18 +262,6 @@ public class CachesRegistry {
return cachesConfPersistFuture =
persistCacheConfigurations(cacheConfigsToPersist);
}
- /**
- * Checks whether given cache configuration should be persisted.
- *
- * @param cacheCfg Cache config.
- * @return {@code True} if cache configuration should be persisted, {@code
false} in other case.
- */
- private boolean shouldPersist(CacheConfiguration<?, ?> cacheCfg) {
- return cctx.pageStore() != null &&
- CU.isPersistentCache(cacheCfg,
cctx.gridConfig().getDataStorageConfiguration()) &&
- !cctx.kernalContext().clientNode();
- }
-
/**
* Persists cache configurations.
*
@@ -284,7 +272,10 @@ public class CachesRegistry {
// Pre-create cache work directories if they don't exist.
for (StoredCacheData data : cacheConfigsToPersist) {
try {
- cctx.pageStore().checkAndInitCacheWorkDir(data.config());
+ FilePageStoreManager.checkAndInitCacheWorkDir(
+ cctx.cache().configManager().cacheWorkDir(data.config()),
+ log
+ );
}
catch (IgniteCheckedException e) {
if (!cctx.kernalContext().isStopping()) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 25728dcd104..98c00118710 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1118,9 +1118,7 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
U.stopLifecycleAware(log, lifecycleAwares(ctx.group(),
cache.configuration(), ctx.store().configuredStore()));
- IgnitePageStoreManager pageStore;
-
- if (callDestroy && (pageStore = sharedCtx.pageStore()) != null) {
+ if (callDestroy && CU.storeCacheConfig(sharedCtx, ctx.config())) {
try {
locCfgMgr.removeCacheData(new
StoredCacheData(ctx.config()));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d0be19c626e..96123a3856d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -2118,6 +2118,22 @@ public class GridCacheUtils {
return false;
}
+ /**
+ * Checks whether given cache configuration should be persisted.
+ *
+ * @param cacheCfg Cache config.
+ * @return {@code True} if cache configuration should be persisted, {@code
false} in other case.
+ */
+ public static boolean storeCacheConfig(GridCacheSharedContext<?, ?> cctx,
CacheConfiguration<?, ?> cacheCfg) {
+ if (cctx.kernalContext().clientNode())
+ return false;
+
+ DataRegionConfiguration drCfg =
+ findDataRegion(cctx.gridConfig().getDataStorageConfiguration(),
cacheCfg.getDataRegionName());
+
+ return drCfg != null && (drCfg.isPersistenceEnabled() ||
drCfg.isCdcEnabled());
+ }
+
/**
* @param pageSize Page size.
* @param encSpi Encryption spi.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
index ed69095434e..6cc56335480 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
@@ -43,15 +43,16 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.util.typedef.F;
@@ -64,7 +65,6 @@ import org.apache.ignite.marshaller.MarshallerUtils;
import static java.nio.file.Files.newDirectoryStream;
import static
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
-import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
@@ -187,29 +187,12 @@ public class GridLocalConfigManager {
}
/**
- * @param dir Cache (group) directory.
- * @param ccfgs Cache configurations.
+ * @param conf File with stored cache data.
+ * @return Cache data.
* @throws IgniteCheckedException If failed.
*/
- public void readCacheConfigurations(File dir, Map<String, StoredCacheData>
ccfgs) throws IgniteCheckedException {
- if (dir.getName().startsWith(CACHE_DIR_PREFIX)) {
- File conf = new File(dir, CACHE_DATA_FILENAME);
-
- if (conf.exists() && conf.length() > 0) {
- StoredCacheData cacheData = readCacheData(conf);
-
- String cacheName = cacheData.config().getName();
-
- if (!ccfgs.containsKey(cacheName))
- ccfgs.put(cacheName, cacheData);
- else {
- U.warn(log, "Cache with name=" + cacheName + " is already
registered, skipping config file "
- + dir.getName());
- }
- }
- }
- else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
- readCacheGroupCaches(dir, ccfgs);
+ public StoredCacheData readCacheData(File conf) throws
IgniteCheckedException {
+ return readCacheData(conf, marshaller, ctx.config());
}
/**
@@ -217,9 +200,13 @@ public class GridLocalConfigManager {
* @return Cache data.
* @throws IgniteCheckedException If failed.
*/
- public StoredCacheData readCacheData(File conf) throws
IgniteCheckedException {
+ public static StoredCacheData readCacheData(
+ File conf,
+ Marshaller marshaller,
+ IgniteConfiguration cfg
+ ) throws IgniteCheckedException {
try (InputStream stream = new BufferedInputStream(new
FileInputStream(conf))) {
- return marshaller.unmarshal(stream,
U.resolveClassLoader(ctx.config()));
+ return marshaller.unmarshal(stream, U.resolveClassLoader(cfg));
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteCheckedException("An error occurred during cache
configuration loading from file [file=" +
@@ -255,19 +242,14 @@ public class GridLocalConfigManager {
) throws IgniteCheckedException {
assert cacheData != null;
- GridCacheSharedContext<Object, Object> sharedContext =
cacheProcessor.context();
-
- boolean shouldStore = sharedContext.pageStore() != null
- && !sharedContext.kernalContext().clientNode()
- && isPersistentCache(cacheData.config(),
sharedContext.gridConfig().getDataStorageConfiguration());
+ CacheConfiguration<?, ?> ccfg = cacheData.config();
- if (!shouldStore)
+ if (!CU.storeCacheConfig(cacheProcessor.context(), ccfg))
return;
- CacheConfiguration<?, ?> ccfg = cacheData.config();
File cacheWorkDir = cacheWorkDir(ccfg);
- cacheProcessor.context().pageStore().checkAndInitCacheWorkDir(ccfg);
+ FilePageStoreManager.checkAndInitCacheWorkDir(cacheWorkDir, log);
assert cacheWorkDir.exists() : "Work directory does not exist: " +
cacheWorkDir;
@@ -355,7 +337,7 @@ public class GridLocalConfigManager {
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new
HashMap<>();
- restoreCaches(caches, templates, ctx.config(),
ctx.cache().context().pageStore());
+ restoreCaches(caches, templates, ctx.config());
CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
IgniteUuid.randomUuid(),
@@ -423,19 +405,79 @@ public class GridLocalConfigManager {
return;
for (File file : files) {
- if (!file.isDirectory() &&
file.getName().endsWith(CACHE_DATA_FILENAME) && file.length() > 0) {
- StoredCacheData cacheData = readCacheData(file);
+ if (!file.isDirectory() &&
file.getName().endsWith(CACHE_DATA_FILENAME) && file.length() > 0)
+ readAndAdd(
+ ccfgs,
+ file,
+ cacheName -> "Cache with name=" + cacheName + " is already
registered, " +
+ "skipping config file " + file.getName() + " in group
directory " + grpDir.getName()
+ );
+ }
+ }
- String cacheName = cacheData.config().getName();
+ /**
+ * @param dir Cache (group) directory.
+ * @param ccfgs Cache configurations.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void readCacheConfigurations(File dir, Map<String, StoredCacheData>
ccfgs) throws IgniteCheckedException {
+ if (dir.getName().startsWith(CACHE_DIR_PREFIX)) {
+ File conf = new File(dir, CACHE_DATA_FILENAME);
- if (!ccfgs.containsKey(cacheName))
- ccfgs.put(cacheName, cacheData);
- else {
- U.warn(log, "Cache with name=" + cacheName + " is already
registered, skipping config file "
- + file.getName() + " in group directory " +
grpDir.getName());
- }
+ if (conf.exists() && conf.length() > 0) {
+ readAndAdd(
+ ccfgs,
+ conf,
+ cache -> "Cache with name=" + cache + " is already
registered, skipping config file " + dir.getName()
+ );
}
}
+ else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+ readCacheGroupCaches(dir, ccfgs);
+ }
+
+ /**
+ * @param ccfgs Loaded configurations.
+ * @param file Storead cache data file.
+ * @param msg Warning message producer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void readAndAdd(
+ Map<String, StoredCacheData> ccfgs,
+ File file,
+ Function<String, String> msg
+ ) throws IgniteCheckedException {
+ StoredCacheData cacheData = readCacheData(file, marshaller,
ctx.config());
+
+ String cacheName = cacheData.config().getName();
+
+ // In-memory CDC stored data must be removed on node failover.
+ if (inMemoryCdcCache(cacheData.config())) {
+ removeCacheData(cacheData);
+
+ U.warn(
+ log,
+ "Stored data for in-memory CDC cache removed[name=" +
cacheName + ", file=" + file.getName() + ']'
+ );
+
+ return;
+ }
+
+ if (!ccfgs.containsKey(cacheName))
+ ccfgs.put(cacheName, cacheData);
+ else
+ U.warn(log, msg.apply(cacheName));
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ * @return {@code True} if cache placed in in-memory and CDC enabled data
region.
+ */
+ private boolean inMemoryCdcCache(CacheConfiguration<?, ?> cfg) {
+ DataRegionConfiguration drCfg =
+ CU.findDataRegion(ctx.config().getDataStorageConfiguration(),
cfg.getDataRegionName());
+
+ return drCfg != null && !drCfg.isPersistenceEnabled() &&
drCfg.isCdcEnabled();
}
/**
@@ -453,7 +495,7 @@ public class GridLocalConfigManager {
* @param ccfg Cache configuration.
* @return Store dir for given cache.
*/
- private File cacheWorkDir(CacheConfiguration<?, ?> ccfg) {
+ public File cacheWorkDir(CacheConfiguration<?, ?> ccfg) {
return FilePageStoreManager.cacheWorkDir(storeWorkDir,
FilePageStoreManager.cacheDirName(ccfg));
}
@@ -476,18 +518,16 @@ public class GridLocalConfigManager {
/**
* @param caches Caches accumulator.
* @param templates Templates accumulator.
- * @param config Ignite configuration.
- * @param pageStoreManager Page store manager.
+ * @param igniteCfg Ignite configuration.
*/
private void restoreCaches(
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
- IgniteConfiguration config,
- IgnitePageStoreManager pageStoreManager
+ IgniteConfiguration igniteCfg
) throws IgniteCheckedException {
- assert !config.isDaemon() : "Trying to restore cache configurations on
daemon node.";
+ assert !igniteCfg.isDaemon() : "Trying to restore cache configurations
on daemon node.";
- CacheConfiguration[] cfgs = config.getCacheConfiguration();
+ CacheConfiguration[] cfgs = igniteCfg.getCacheConfiguration();
for (int i = 0; i < cfgs.length; i++) {
CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
@@ -498,7 +538,7 @@ public class GridLocalConfigManager {
addCacheFromConfiguration(cfg, false, caches, templates);
}
- if (CU.isPersistenceEnabled(config) && pageStoreManager != null) {
+ if ((CU.isPersistenceEnabled(igniteCfg) &&
ctx.cache().context().pageStore() != null) || CU.isCdcEnabled(igniteCfg)) {
Map<String, StoredCacheData> storedCaches =
readCacheConfigurations();
if (!F.isEmpty(storedCaches)) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
index 08328d76d3c..bc2c835cf3f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.Collection;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -35,7 +37,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
* This class is {@link Serializable} and is intended to be read-written with
{@link JdkMarshaller}
* in order to be serialization wise agnostic to further additions or removals
of fields.
*/
-public class StoredCacheData implements Serializable {
+public class StoredCacheData implements Serializable, CdcCacheEvent {
/** */
private static final long serialVersionUID = 0L;
@@ -102,7 +104,7 @@ public class StoredCacheData implements Serializable {
/**
* @return Query entities.
*/
- public Collection<QueryEntity> queryEntities() {
+ @Override public Collection<QueryEntity> queryEntities() {
return qryEntities;
}
@@ -202,4 +204,14 @@ public class StoredCacheData implements Serializable {
@Override public String toString() {
return S.toString(StoredCacheData.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public int cacheId() {
+ return CU.cacheId(ccfg.getName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheConfiguration<?, ?> configuration() {
+ return ccfg;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index d25458bf116..75ba6cce6c0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1190,7 +1190,20 @@ public class IgniteCacheDatabaseSharedManager extends
GridCacheSharedManagerAdap
* @param stoppedGrps A collection of tuples (cache group, destroy flag).
*/
public void
onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupContext, Boolean>>
stoppedGrps) {
- // No-op.
+ for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
+ CacheGroupContext grp = tup.get1();
+
+ try {
+ boolean destroy = tup.get2();
+
+ if (destroy && CU.storeCacheConfig(cctx, grp.config()))
+
cctx.cache().configManager().removeCacheGroupConfigurationData(grp);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to gracefully clean page store resources
for destroyed cache " +
+ "[cache=" + grp.cacheOrGroupName() + "]", e);
+ }
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index aa332e2ba7c..02c1304a320 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -181,7 +181,7 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new
ConcurrentHashMap<Integer, Boolean>());
/** */
- private final GridStripedReadWriteLock initDirLock =
+ private static final GridStripedReadWriteLock initDirLock =
new
GridStripedReadWriteLock(Math.max(Runtime.getRuntime().availableProcessors(),
8));
/**
@@ -665,7 +665,7 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
PageMetrics pageMetrics,
boolean encrypted) throws IgniteCheckedException {
try {
- boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
+ boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir, log);
if (dirExisted) {
MaintenanceRegistry mntcReg =
cctx.kernalContext().maintenanceRegistry();
@@ -739,15 +739,10 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
return partId == INDEX_PARTITION ? INDEX_FILE_NAME :
format(PART_FILE_TEMPLATE, partId);
}
- /** {@inheritDoc} */
- @Override public boolean checkAndInitCacheWorkDir(CacheConfiguration
cacheCfg) throws IgniteCheckedException {
- return checkAndInitCacheWorkDir(cacheWorkDir(cacheCfg));
- }
-
/**
* @param cacheWorkDir Cache work directory.
*/
- private boolean checkAndInitCacheWorkDir(File cacheWorkDir) throws
IgniteCheckedException {
+ public static boolean checkAndInitCacheWorkDir(File cacheWorkDir,
IgniteLogger log) throws IgniteCheckedException {
boolean dirExisted = false;
ReadWriteLock lock =
initDirLock.getLock(cacheWorkDir.getName().hashCode());
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index 10d4a8c5176..c37d1aa40cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -276,9 +277,12 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
/** */
public abstract static class TestCdcConsumer<T> implements CdcConsumer {
- /** Keys */
+ /** Keys. */
final ConcurrentMap<IgniteBiTuple<ChangeEventType, Integer>, List<T>>
data = new ConcurrentHashMap<>();
+ /** Cache events. */
+ protected final ConcurrentMap<Integer, CdcCacheEvent> caches = new
ConcurrentHashMap<>();
+
/** */
private volatile boolean stopped;
@@ -302,6 +306,8 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()),
k -> new ArrayList<>()).add(extract(evt));
+ assertTrue(caches.containsKey(evt.cacheId()));
+
checkEvent(evt);
});
@@ -313,6 +319,20 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
types.forEachRemaining(t -> assertNotNull(t));
}
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts)
{
+ cacheEvts.forEachRemaining(evt -> {
+ assertFalse(caches.containsKey(evt.cacheId()));
+
+ caches.put(evt.cacheId(), evt);
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(cacheId ->
assertNotNull(this.caches.remove(cacheId)));
+ }
+
/** */
public abstract void checkEvent(CdcEvent evt);
@@ -392,6 +412,57 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
}
}
+ /** */
+ public static class TrackCacheEventsConsumer implements CdcConsumer {
+ /** Cache events. */
+ public final Map<Integer, CdcCacheEvent> evts = new
ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent>
cacheEvents) {
+ cacheEvents.forEachRemaining(e -> {
+ log.info("TrackCacheEventsConsumer.add[cacheId=" + e.cacheId()
+ ", e=" + e + ']');
+ evts.put(e.cacheId(), e);
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(cacheId -> {
+ log.info("TrackCacheEventsConsumer.remove[cacheId=" + cacheId
+ ']');
+
+ evts.remove(cacheId);
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+ evts.forEachRemaining(e -> { /* No-op. */ });
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(e -> { /* No-op. */ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(e -> { /* No-op. */ });
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op.
+ }
+ }
+
/** */
protected static User createUser(int i) {
byte[] bytes = new byte[1024];
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheConfigOnRestartTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheConfigOnRestartTest.java
new file mode 100644
index 00000000000..5895c64273d
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheConfigOnRestartTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Test cache stored data for in-memory CDC caches cleared on node restart.
+ */
+public class CdcCacheConfigOnRestartTest extends AbstractCdcTest {
+ /** Ignite node. */
+ private IgniteEx node;
+
+ /** CDC future. */
+ private IgniteInternalFuture<?> cdcFut;
+
+ /** Consumer. */
+ private TrackCacheEventsConsumer cnsmr;
+
+ /** */
+ public static final String PERSISTENCE = "persistence";
+
+ /** */
+ public static final String IN_MEMORY_CDC = "in-memory-cdc";
+
+ /** */
+ public static final String PURE_IN_MEMORY = "pure-in-memory";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataRegionConfiguration persistenceDr = new DataRegionConfiguration()
+ .setName(PERSISTENCE)
+ .setPersistenceEnabled(true);
+
+ DataRegionConfiguration inMemoryCdc = new DataRegionConfiguration()
+ .setName(IN_MEMORY_CDC)
+ .setPersistenceEnabled(false)
+ .setCdcEnabled(true);
+
+ DataRegionConfiguration pureInMemory = new DataRegionConfiguration()
+ .setName(PURE_IN_MEMORY)
+ .setPersistenceEnabled(false)
+ .setCdcEnabled(false);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDataRegionConfigurations(persistenceDr, inMemoryCdc,
pureInMemory));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ node = startGrid();
+
+ node.cluster().state(ClusterState.ACTIVE);
+
+ cnsmr = new TrackCacheEventsConsumer();
+
+ cdcFut = runAsync(createCdc(cnsmr, node.configuration()));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (cdcFut != null) {
+ assertFalse(cdcFut.isDone());
+
+ cdcFut.cancel();
+ }
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testInMemoryCdcClearedOnRestart() throws Exception {
+ String pureInMemoryCache = "grouped-pure-in-memory-cache";
+ String inMemoryCdcCache = "grouped-in-memory-cdc-cache";
+
+ node.createCache(pureInMemory(PURE_IN_MEMORY));
+ node.createCache(pureInMemory(pureInMemoryCache).setGroupName("grp"));
+ node.createCache(inMemoryCdc(IN_MEMORY_CDC));
+ node.createCache(inMemoryCdc(inMemoryCdcCache).setGroupName("grp2"));
+
+ assertTrue(waitForCondition(
+ () -> cnsmr.evts.containsKey(CU.cacheId(IN_MEMORY_CDC))
+ && cnsmr.evts.containsKey(CU.cacheId(inMemoryCdcCache)),
+ getTestTimeout()
+ ));
+
+ // Pure in-memory caches must node store data.
+ assertFalse(cnsmr.evts.containsKey(CU.cacheId(PURE_IN_MEMORY)));
+ assertFalse(cnsmr.evts.containsKey(CU.cacheId(pureInMemoryCache)));
+
+ stopAllGrids();
+ node = startGrid();
+
+ // Cache config must be removed on node restart.
+ assertTrue(waitForCondition(() -> cnsmr.evts.isEmpty(),
getTestTimeout()));
+ }
+
+ /** */
+ @Test
+ public void testPersistenceNotClearedOnRestart() throws Exception {
+ String persistenceCache = "grouped-persistence-cache";
+ String persistenceCache2 = "persistence-2";
+ String inMemoryCdcCache = "grouped-in-memory-cdc-cache";
+
+ node.createCache(persistence(PERSISTENCE));
+ node.createCache(persistence(persistenceCache).setGroupName("grp"));
+ node.createCache(inMemoryCdc(IN_MEMORY_CDC));
+ node.createCache(inMemoryCdc(inMemoryCdcCache).setGroupName("grp2"));
+
+ assertTrue(waitForCondition(
+ () -> cnsmr.evts.containsKey(CU.cacheId(IN_MEMORY_CDC))
+ && cnsmr.evts.containsKey(CU.cacheId(inMemoryCdcCache))
+ && cnsmr.evts.containsKey(CU.cacheId(PERSISTENCE))
+ && cnsmr.evts.containsKey(CU.cacheId(persistenceCache)),
+ getTestTimeout()
+ ));
+
+ stopAllGrids();
+ node = startGrid();
+
+ // Create one more cache to know for sure CDC updated cache data.
+ node.createCache(persistence(persistenceCache2));
+
+ assertTrue(waitForCondition(() ->
cnsmr.evts.containsKey(CU.cacheId(persistenceCache2)), getTestTimeout()));
+
+ // Cache config for in-memory CDC caches must be removed on node
restart.
+ assertFalse(cnsmr.evts.containsKey(CU.cacheId(IN_MEMORY_CDC)));
+ assertFalse(cnsmr.evts.containsKey(CU.cacheId(inMemoryCdcCache)));
+
+ assertTrue(cnsmr.evts.containsKey(CU.cacheId(PERSISTENCE)));
+ assertTrue(cnsmr.evts.containsKey(CU.cacheId(persistenceCache)));
+
+ assertEquals(3, cnsmr.evts.size());
+ }
+
+ /** */
+ private CacheConfiguration<?, ?> persistence(String cacheName) {
+ return new CacheConfiguration<Integer,
Integer>(cacheName).setDataRegionName(PERSISTENCE);
+ }
+
+ /** */
+ private CacheConfiguration<?, ?> inMemoryCdc(String cacheName) {
+ return new CacheConfiguration<Integer,
Integer>(cacheName).setDataRegionName(IN_MEMORY_CDC);
+ }
+
+ /** */
+ private CacheConfiguration<?, ?> pureInMemory(String cacheName) {
+ return new CacheConfiguration<Integer,
Integer>(cacheName).setDataRegionName(PURE_IN_MEMORY);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index a5eed340c5e..eda1082d0d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -284,6 +284,15 @@ public class CdcSelfTest extends AbstractCdcTest {
mappings.forEachRemaining(m -> assertNotNull(m));
}
+ @Override public void onCacheChange(Iterator<CdcCacheEvent>
cacheEvents) {
+ cacheEvents.forEachRemaining(ce -> assertNotNull(ce));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches)
{
+ caches.forEachRemaining(ce -> assertNotNull(ce));
+ }
+
@Override public void stop() {
// No-op.
}
@@ -367,6 +376,14 @@ public class CdcSelfTest extends AbstractCdcTest {
mappings.forEachRemaining(m -> assertNotNull(m));
}
+ @Override public void onCacheChange(Iterator<CdcCacheEvent>
cacheEvents) {
+ cacheEvents.forEachRemaining(ce -> assertNotNull(ce));
+ }
+
+ @Override public void onCacheDestroy(Iterator<Integer> caches)
{
+ caches.forEachRemaining(ce -> assertNotNull(ce));
+ }
+
@Override public void stop() {
// No-op.
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
index 33d6f11c41b..03312bbb0ec 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
@@ -217,9 +217,4 @@ public class NoOpPageStoreManager implements
IgnitePageStoreManager {
@Override public void cleanupPageStoreIfMatch(Predicate<Integer>
cacheGrpPred, boolean cleanFiles) {
// No-op.
}
-
- /** {@inheritDoc} */
- @Override public boolean checkAndInitCacheWorkDir(CacheConfiguration
cacheCfg) throws IgniteCheckedException {
- return false;
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 96a1ec075a0..ad5fc72e8b1 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.cdc.CdcCacheConfigOnRestartTest;
import org.apache.ignite.cdc.CdcCacheVersionTest;
import org.apache.ignite.cdc.CdcSelfTest;
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
@@ -152,6 +153,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
CdcCacheConfigOnRestartTest.class, ignoredTests);
// new style folders with generated consistent ID test
GridTestUtils.addTestIfNeeded(suite,
IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CacheEventsCdcTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CacheEventsCdcTest.java
new file mode 100644
index 00000000000..de553ec1d91
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CacheEventsCdcTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.internal.cdc.SqlCdcTest.executeSql;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests cache events for CDC.
+ */
+@RunWith(Parameterized.class)
+public class CacheEventsCdcTest extends AbstractCdcTest {
+ /** Ignite node. */
+ private IgniteEx node;
+
+ /** */
+ private IgniteCache<Integer, Integer> dummy;
+
+ /** CDC future. */
+ private IgniteInternalFuture<?> cdcFut;
+
+ /** Consumer. */
+ private TrackCacheEventsConsumer cnsmr;
+
+ /** */
+ @Parameterized.Parameter
+ public boolean persistenceEnabled;
+
+ /** */
+ @Parameterized.Parameters(name = "persistence={0}")
+ public static Object[] parameters() {
+ return new Object[] {true, false};
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(persistenceEnabled)
+ .setCdcEnabled(true)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ node = startGrid();
+
+ node.cluster().state(ClusterState.ACTIVE);
+
+ cnsmr = new TrackCacheEventsConsumer();
+
+ cdcFut = runAsync(createCdc(cnsmr, node.configuration()));
+
+ dummy = node.getOrCreateCache("dummy");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (cdcFut != null) {
+ assertFalse(cdcFut.isDone());
+
+ cdcFut.cancel();
+ }
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testCreateDestroyCache() throws Exception {
+ node.createCache(DEFAULT_CACHE_NAME);
+
+ assertTrue(waitForCondition(() ->
cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+
+ node.destroyCache(DEFAULT_CACHE_NAME);
+
+ assertTrue(waitForCondition(() ->
!cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+ }
+
+ /** */
+ @Test
+ public void testCreateDestroyCachesInGroup() throws Exception {
+ String otherCache = "other-cache";
+
+ node.createCache(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME).setGroupName("group"));
+ node.createCache(new CacheConfiguration<Integer,
Integer>(otherCache).setGroupName("group"));
+
+ dummy.put(1, 1); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(() ->
cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+ assertTrue(waitForCondition(() ->
cnsmr.evts.containsKey(CU.cacheId(otherCache)), getTestTimeout()));
+
+ node.destroyCache(DEFAULT_CACHE_NAME);
+
+ dummy.put(2, 2); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(() ->
!cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+ assertTrue(cnsmr.evts.containsKey(CU.cacheId(otherCache)));
+
+ node.destroyCache(otherCache);
+
+ dummy.put(3, 3); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(() ->
!cnsmr.evts.containsKey(CU.cacheId(otherCache)), getTestTimeout()));
+ assertFalse(cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)));
+ }
+
+ /** */
+ @Test
+ public void testCreateDropSQLTable() throws Exception {
+ AtomicReference<QueryEntity> fromCfg = new AtomicReference<>();
+
+ Function<Integer, GridAbsPredicate> checker = fldCnt -> () -> {
+ CdcCacheEvent evt = cnsmr.evts.get(CU.cacheId("T1"));
+
+ if (evt == null)
+ return false;
+
+ assertNotNull(evt.configuration().getQueryEntities());
+ assertNotNull(evt.queryEntities());
+
+ assertEquals(1, evt.queryEntities().size());
+
+ QueryEntity qryEntity = evt.queryEntities().iterator().next();
+
+ if (qryEntity.getFields().size() != fldCnt)
+ return false;
+
+ QueryEntity fromCfg0 =
evt.configuration().getQueryEntities().iterator().next();
+
+ fromCfg.set(fromCfg0);
+
+ assertTrue(qryEntity.getFields().containsKey("ID"));
+ assertTrue(qryEntity.getFields().containsKey("NAME"));
+
+ assertTrue(fromCfg0.getFields().containsKey("ID"));
+ assertTrue(fromCfg0.getFields().containsKey("NAME"));
+
+ assertFalse(
+ "Saved config must not change on schema change",
+ fromCfg0.getFields().containsKey("CITY_ID")
+ );
+
+ if (fldCnt == 3)
+ assertTrue(qryEntity.getFields().containsKey("CITY_ID"));
+
+ assertTrue(qryEntity.getIndexes().isEmpty());
+ assertTrue(fromCfg0.getIndexes().isEmpty());
+
+ return true;
+ };
+
+ executeSql(node, "CREATE TABLE T1(ID INT, NAME VARCHAR, PRIMARY KEY
(ID)) WITH \"CACHE_NAME=T1\"");
+
+ dummy.put(1, 1); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(checker.apply(2), getTestTimeout()));
+
+ executeSql(node, "ALTER TABLE T1 ADD COLUMN CITY_ID INT");
+
+ dummy.put(2, 2); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(checker.apply(3), getTestTimeout()));
+
+ executeSql(node, "CREATE INDEX I1 ON T1(CITY_ID)");
+
+ dummy.put(3, 3); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(() -> {
+ CdcCacheEvent evt = cnsmr.evts.get(CU.cacheId("T1"));
+
+ QueryEntity qryEntity = evt.queryEntities().iterator().next();
+
+ if (F.isEmpty(qryEntity.getIndexes()))
+ return false;
+
+ QueryEntity fromCfg0 =
evt.configuration().getQueryEntities().iterator().next();
+
+ assertEquals(fromCfg.get(), fromCfg0);
+ assertTrue(fromCfg0.getIndexes().isEmpty());
+
+ QueryIndex idx = qryEntity.getIndexes().iterator().next();
+
+ assertEquals("I1", idx.getName());
+ assertEquals(1, idx.getFields().size());
+ assertEquals("CITY_ID",
idx.getFields().keySet().iterator().next());
+
+ return true;
+ }, getTestTimeout()));
+
+ executeSql(node, "DROP TABLE T1");
+
+ dummy.put(4, 4); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(() ->
!cnsmr.evts.containsKey(CU.cacheId("T1")), getTestTimeout()));
+ }
+
+ /** */
+ @Test
+ public void testCreateTableForExistingCache() throws Exception {
+ Function<Boolean, GridAbsPredicate> checker = chkTblExist -> () -> {
+ CdcCacheEvent evt = cnsmr.evts.get(CU.cacheId(DEFAULT_CACHE_NAME));
+
+ if (evt == null)
+ return false;
+
+ if (!chkTblExist)
+ return true;
+
+ if (F.isEmpty(evt.queryEntities()))
+ return false;
+
+ assertEquals(1, evt.queryEntities().size());
+
+ QueryEntity qryEntity = evt.queryEntities().iterator().next();
+
+ if (qryEntity.getFields().size() != 2)
+ return false;
+
+ assertTrue(qryEntity.getFields().containsKey("ID"));
+ assertTrue(qryEntity.getFields().containsKey("NAME"));
+
+ return true;
+ };
+
+ node.createCache(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME));
+
+ dummy.put(5, 5); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(checker.apply(false), getTestTimeout()));
+
+ executeSql(
+ node,
+ "CREATE TABLE T1(ID INT, NAME VARCHAR, PRIMARY KEY (ID)) WITH
\"CACHE_NAME=" + DEFAULT_CACHE_NAME + "\""
+ );
+
+ dummy.put(6, 6); // Dummy entry to force automatic WAL rollover.
+
+ assertTrue(waitForCondition(checker.apply(true), getTestTimeout()));
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
index db33d1cd9be..0cd6b7964ce 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
@@ -24,8 +24,10 @@ import java.util.concurrent.CountDownLatch;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -72,6 +74,21 @@ public class SqlCdcTest extends AbstractCdcTest {
/** */
public static final String CITY_VAL_TYPE = "TestCity";
+ /** */
+ public static final String ID = "ID";
+
+ /** */
+ public static final String CITY_ID = "CITY_ID";
+
+ /** */
+ public static final String NAME = "NAME";
+
+ /** */
+ public static final String ZIP_CODE = "ZIP_CODE";
+
+ /** */
+ public static final String REGION = "REGION";
+
/** */
@Parameterized.Parameter
public boolean persistenceEnabled;
@@ -95,6 +112,13 @@ public class SqlCdcTest extends AbstractCdcTest {
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
/** Simplest CDC test. */
@Test
public void testReadAllSQLRows() throws Exception {
@@ -118,13 +142,13 @@ public class SqlCdcTest extends AbstractCdcTest {
executeSql(
ign,
"CREATE TABLE USER(id int, city_id int, name varchar, PRIMARY KEY
(id, city_id)) " +
- "WITH \"CACHE_NAME=user,VALUE_TYPE=" + USER_VAL_TYPE +
",KEY_TYPE=" + USER_KEY_TYPE + "\""
+ "WITH \"CACHE_NAME=" + USER + ",VALUE_TYPE=" + USER_VAL_TYPE +
",KEY_TYPE=" + USER_KEY_TYPE + "\""
);
executeSql(
ign,
"CREATE TABLE CITY(id int, name varchar, zip_code varchar(6),
PRIMARY KEY (id)) " +
- "WITH \"CACHE_NAME=city,VALUE_TYPE=TestCity\""
+ "WITH \"CACHE_NAME=" + CITY + ",VALUE_TYPE=TestCity\""
);
for (int i = 0; i < KEYS_CNT; i++) {
@@ -160,7 +184,7 @@ public class SqlCdcTest extends AbstractCdcTest {
cdc = createCdc(cnsmr, cfg);
- IgniteInternalFuture<?> rmvFut = runAsync(cdc);
+ IgniteInternalFuture<?> cdcFut = runAsync(cdc);
waitForSize(KEYS_CNT, USER, DELETE, cnsmr);
@@ -172,13 +196,15 @@ public class SqlCdcTest extends AbstractCdcTest {
ign,
"INSERT INTO CITY VALUES(?, ?, ?, ?)",
KEYS_CNT + 1,
- MSK,
+ SPB,
Integer.toString(127000 + KEYS_CNT + 1),
- "Moscow region");
+ "Saint Petersburg");
waitForSize(KEYS_CNT + 1, CITY, UPDATE, cnsmr);
- rmvFut.cancel();
+ assertFalse(cdcFut.isDone());
+
+ cdcFut.cancel();
assertTrue(cnsmr.stopped());
}
@@ -207,12 +233,12 @@ public class SqlCdcTest extends AbstractCdcTest {
return;
if (evt.cacheId() == cacheId(USER)) {
- int id = ((BinaryObject)evt.key()).field("ID");
- int cityId = ((BinaryObject)evt.key()).field("CITY_ID");
+ int id = ((BinaryObject)evt.key()).field(ID);
+ int cityId = ((BinaryObject)evt.key()).field(CITY_ID);
assertEquals(42 * id, cityId);
- String name = ((BinaryObject)evt.value()).field("NAME");
+ String name = ((BinaryObject)evt.value()).field(NAME);
if (id % 2 == 0)
assertTrue(name.startsWith(JOHN));
@@ -221,8 +247,8 @@ public class SqlCdcTest extends AbstractCdcTest {
}
else {
int id = (Integer)evt.key();
- String name = ((BinaryObject)evt.value()).field("NAME");
- String zipCode = ((BinaryObject)evt.value()).field("ZIP_CODE");
+ String name = ((BinaryObject)evt.value()).field(NAME);
+ String zipCode = ((BinaryObject)evt.value()).field(ZIP_CODE);
assertEquals(Integer.toString(127000 + id), zipCode);
@@ -249,34 +275,34 @@ public class SqlCdcTest extends AbstractCdcTest {
switch (type.typeName()) {
case USER_KEY_TYPE:
-
assertTrue(type.fieldNames().containsAll(Arrays.asList("ID", "CITY_ID")));
+
assertTrue(type.fieldNames().containsAll(Arrays.asList(ID, CITY_ID)));
assertEquals(2, type.fieldNames().size());
- assertEquals(int.class.getSimpleName(),
type.fieldTypeName("ID"));
- assertEquals(int.class.getSimpleName(),
type.fieldTypeName("CITY_ID"));
+ assertEquals(int.class.getSimpleName(),
type.fieldTypeName(ID));
+ assertEquals(int.class.getSimpleName(),
type.fieldTypeName(CITY_ID));
userKeyType = true;
break;
case USER_VAL_TYPE:
- assertTrue(type.fieldNames().contains("NAME"));
+ assertTrue(type.fieldNames().contains(NAME));
assertEquals(1, type.fieldNames().size());
- assertEquals(String.class.getSimpleName(),
type.fieldTypeName("NAME"));
+ assertEquals(String.class.getSimpleName(),
type.fieldTypeName(NAME));
userValType = true;
break;
case CITY_VAL_TYPE:
-
assertTrue(type.fieldNames().containsAll(Arrays.asList("NAME", "ZIP_CODE")));
+
assertTrue(type.fieldNames().containsAll(Arrays.asList(NAME, ZIP_CODE)));
assertEquals(cityValType ? 3 : 2,
type.fieldNames().size());
- assertEquals(String.class.getSimpleName(),
type.fieldTypeName("NAME"));
- assertEquals(String.class.getSimpleName(),
type.fieldTypeName("ZIP_CODE"));
+ assertEquals(String.class.getSimpleName(),
type.fieldTypeName(NAME));
+ assertEquals(String.class.getSimpleName(),
type.fieldTypeName(ZIP_CODE));
// Alter table happen.
if (cityValType) {
- assertTrue(type.fieldNames().contains("REGION"));
- assertEquals(String.class.getSimpleName(),
type.fieldTypeName("REGION"));
+ assertTrue(type.fieldNames().contains(REGION));
+ assertEquals(String.class.getSimpleName(),
type.fieldTypeName(REGION));
}
cityValType = true;
@@ -308,10 +334,49 @@ public class SqlCdcTest extends AbstractCdcTest {
assertEquals(mapper.typeId(typeName), m.typeId());
}
}
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts)
{
+ cacheEvts.forEachRemaining(evt -> {
+ if (evt.configuration().getName().equals(CITY)) {
+ assertNotNull(evt.queryEntities());
+ assertEquals(1, evt.queryEntities().size());
+
+ QueryEntity tbl = evt.queryEntities().iterator().next();
+
+ assertEquals(CITY.toUpperCase(), tbl.getTableName());
+ assertEquals(caches.containsKey(evt.cacheId()) ? 4 : 3,
tbl.getFields().size());
+ assertEquals(Integer.class.getName(), tbl.getKeyType());
+ assertEquals(ID, tbl.getKeyFieldName());
+
assertTrue(tbl.getFields().keySet().containsAll(Arrays.asList(ID, NAME,
ZIP_CODE)));
+
+ if (caches.containsKey(evt.cacheId()))
+ assertTrue(tbl.getFields().containsKey(REGION));
+
+ assertEquals((Integer)6,
tbl.getFieldsPrecision().get(ZIP_CODE));
+ }
+ else if (evt.configuration().getName().equals(USER)) {
+ assertNotNull(evt.queryEntities());
+ assertEquals(1, evt.queryEntities().size());
+
+ QueryEntity tbl = evt.queryEntities().iterator().next();
+
+ assertEquals(USER.toUpperCase(), tbl.getTableName());
+ assertEquals(3, tbl.getFields().size());
+ assertEquals(USER_KEY_TYPE, tbl.getKeyType());
+ assertEquals(USER_VAL_TYPE, tbl.getValueType());
+
assertTrue(tbl.getFields().keySet().containsAll(Arrays.asList(ID, CITY_ID,
NAME)));
+ }
+ else
+ fail("Unknown cache[" + evt.configuration().getName() +
']');
+
+ caches.put(evt.cacheId(), evt);
+ });
+ }
}
/** */
- private List<List<?>> executeSql(IgniteEx node, String sqlText, Object...
args) {
+ static List<List<?>> executeSql(IgniteEx node, String sqlText, Object...
args) {
return node.context().query().querySqlFields(new
SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
index 10905a573fa..b182ee0efe1 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyTypeRegistryTest;
+import org.apache.ignite.internal.cdc.CacheEventsCdcTest;
import org.apache.ignite.internal.cdc.SqlCdcTest;
import org.apache.ignite.internal.metric.SystemViewSelfTest;
import org.apache.ignite.internal.processors.cache.BigEntryQueryTest;
@@ -380,7 +381,8 @@ import org.junit.runners.Suite;
IgniteStatisticsTestSuite.class,
// CDC tests.
- SqlCdcTest.class
+ SqlCdcTest.class,
+ CacheEventsCdcTest.class
})
public class IgniteBinaryCacheQueryTestSuite3 {
diff --git
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
index 84ac26bcf88..59e65c4848d 100644
---
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
+++
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
@@ -144,6 +144,16 @@ public class CdcConfigurationTest extends
GridCommonAbstractTest {
// No-Op.
}
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent>
cacheEvents) {
+ // No-Op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ // No-Op.
+ }
+
/** {@inheritDoc} */
@Override public void stop() {
// No-Op.