IGNITE-6235 Added ability to handle CacheObject from DataRecord in standalone WAL iterator - Fixes #2620.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39b90301 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39b90301 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39b90301 Branch: refs/heads/ignite-6149 Commit: 39b903013f564f4229ff3284e22e46001a4f9b46 Parents: 8936a0d Author: dpavlov <[email protected]> Authored: Fri Sep 15 14:47:03 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Sep 15 14:47:03 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 30 +- .../internal/MarshallerMappingFileStore.java | 13 +- .../internal/pagemem/wal/record/DataEntry.java | 20 +- .../internal/pagemem/wal/record/DataRecord.java | 4 +- .../pagemem/wal/record/LazyDataEntry.java | 38 +- .../internal/pagemem/wal/record/TxRecord.java | 5 +- .../pagemem/wal/record/UnwrapDataEntry.java | 118 ++++ .../cache/binary/BinaryMetadataFileStore.java | 29 +- .../binary/CacheObjectBinaryProcessorImpl.java | 18 +- .../wal/AbstractWalRecordsIterator.java | 29 +- .../wal/FileWriteAheadLogManager.java | 10 +- .../cache/persistence/wal/RecordSerializer.java | 5 + .../wal/reader/IgniteWalIteratorFactory.java | 129 +++- .../wal/reader/StandaloneGridKernalContext.java | 116 ++- .../reader/StandaloneIgnitePluginProcessor.java | 6 +- .../reader/StandaloneNoopCommunicationSpi.java | 83 +++ .../wal/reader/StandaloneNoopDiscoverySpi.java | 127 ++++ .../reader/StandaloneWalRecordsIterator.java | 143 +++- .../wal/serializer/RecordV1Serializer.java | 5 + .../db/wal/reader/IgniteWalReaderTest.java | 698 +++++++++++++++++-- .../db/wal/reader/IndexedObject.java | 79 +++ .../db/wal/reader/MockWalIteratorFactory.java | 14 +- modules/dev-utils/pom.xml | 2 +- .../development/utils/IgniteWalConverter.java | 5 +- .../ant/beautifier/GridJavadocAntTask.java | 8 +- parent/pom.xml | 4 + 26 files changed, 1584 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 0b3f220..f57bda7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; import java.util.AbstractMap; @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@ -88,6 +89,12 @@ public class MarshallerContextImpl implements MarshallerContext { private boolean clientNode; /** + * Marshaller mapping file store directory. {@code null} used for standard folder, in this case folder is calculated + * from work directory. Non null value may be used to setup custom directory from outside + */ + @Nullable private File marshallerMappingFileStoreDir; + + /** * Initializes context. * * @param plugins Plugins. @@ -491,7 +498,10 @@ public class MarshallerContextImpl implements MarshallerContext { IgniteConfiguration cfg = ctx.config(); String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome()); - fileStore = new MarshallerMappingFileStore(workDir, ctx.log(MarshallerMappingFileStore.class)); + final IgniteLogger fileStoreLog = ctx.log(MarshallerMappingFileStore.class); + fileStore = marshallerMappingFileStoreDir == null ? + new MarshallerMappingFileStore(workDir, fileStoreLog) : + new MarshallerMappingFileStore(fileStoreLog, marshallerMappingFileStoreDir); this.transport = transport; closProc = ctx.closure(); clientNode = ctx.clientNode(); @@ -537,6 +547,22 @@ public class MarshallerContextImpl implements MarshallerContext { } /** + * @return custom marshaller mapping files directory. Used for standalone WAL iteration + */ + @Nullable public File getMarshallerMappingFileStoreDir() { + return marshallerMappingFileStoreDir; + } + + /** + * Sets custom marshaller mapping files directory. Used for standalone WAL iteration + * + * @param marshallerMappingFileStoreDir directory with type name mappings + */ + public void setMarshallerMappingFileStoreDir(@Nullable final File marshallerMappingFileStoreDir) { + this.marshallerMappingFileStoreDir = marshallerMappingFileStoreDir; + } + + /** * */ static final class CombinedMap extends AbstractMap<Integer, MappedName> http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java index e4a844e..eabbdb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java @@ -51,13 +51,14 @@ final class MarshallerMappingFileStore { /** */ private final IgniteLogger log; - /** */ + /** Marshaller mapping directory */ private final File workDir; /** */ private final String FILE_EXTENSION = ".classname"; /** + * @param igniteWorkDir Ignite work directory * @param log Logger. */ MarshallerMappingFileStore(String igniteWorkDir, IgniteLogger log) throws IgniteCheckedException { @@ -66,6 +67,16 @@ final class MarshallerMappingFileStore { } /** + * Creates marshaller mapping file store with custom predefined work directory + * @param log logger. + * @param marshallerMappingFileStoreDir custom marshaller work directory + */ + MarshallerMappingFileStore(final IgniteLogger log, final File marshallerMappingFileStoreDir) { + this.workDir = marshallerMappingFileStoreDir; + this.log = log; + } + + /** * @param platformId Platform id. * @param typeId Type id. * @param typeName Type name. http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java index 9ebf306..d4e0b9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java @@ -20,41 +20,39 @@ package org.apache.ignite.internal.pagemem.wal.record; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; /** - * + * Represents Data Entry ({@link #key}, {@link #val value}) pair update {@link #op operation} in WAL log. */ public class DataEntry { - /** */ + /** Cache ID. */ @GridToStringInclude protected int cacheId; - /** */ + /** Cache object key */ protected KeyCacheObject key; - /** */ + /** Cache object value */ protected CacheObject val; - /** */ + /** Entry operation performed */ @GridToStringInclude protected GridCacheOperation op; - /** */ + /** Near transaction version. */ protected GridCacheVersion nearXidVer; - /** */ + /** Write version. */ @GridToStringInclude protected GridCacheVersion writeVer; - /** */ + /** Expire time. */ protected long expireTime; - /** */ + /** Partition ID. */ @GridToStringInclude protected int partId; http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java index c6322c3..0e92383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java @@ -23,7 +23,9 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; /** - * + * Logical data record with cache operation description. + * This record contains information about operation we want to do. + * Contains operation type (put, remove) and (Key, Value, Version) for each {@link DataEntry} */ public class DataRecord extends WALRecord { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java index 98db831..0ad87d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java @@ -28,29 +28,36 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; /** - * + * Represents Data Entry ({@link #key}, {@link #val value}) pair update {@link #op operation}. <br> + * This Data entry was not converted to key, value pair during record deserialization. */ public class LazyDataEntry extends DataEntry { /** */ private GridCacheSharedContext cctx; - /** */ + /** Data Entry key type code. See {@link CacheObject} for built-in value type codes */ private byte keyType; - /** */ + /** Key value bytes. */ private byte[] keyBytes; - /** */ + /** Data Entry Value type code. See {@link CacheObject} for built-in value type codes */ private byte valType; - /** */ + /** Value value bytes. */ private byte[] valBytes; /** + * @param cctx Shared context. * @param cacheId Cache ID. + * @param keyType Object type code for Key. + * @param keyBytes Data Entry Key value bytes. + * @param valType Object type code for Value. + * @param valBytes Data Entry Value value bytes. * @param op Operation. * @param nearXidVer Near transaction version. * @param writeVer Write version. + * @param expireTime Expire time. * @param partId Partition ID. * @param partCnt Partition counter. */ @@ -113,4 +120,25 @@ public class LazyDataEntry extends DataEntry { return val; } + + /** @return Data Entry Key type code. See {@link CacheObject} for built-in value type codes */ + public byte getKeyType() { + return keyType; + } + + /** @return Key value bytes. */ + public byte[] getKeyBytes() { + return keyBytes; + } + + /** @return Data Entry Value type code. See {@link CacheObject} for built-in value type codes */ + public byte getValType() { + return valType; + } + + /** @return Value value bytes. */ + public byte[] getValBytes() { + return valBytes; + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java index 2fbcf4f..9bb747b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java @@ -22,7 +22,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; /** - * + * Logical data record indented for transaction (tx) related actions.<br> + * This record is marker of begin, prepare, commit, and rollback transactions. */ public class TxRecord extends WALRecord { /** @@ -58,7 +59,7 @@ public class TxRecord extends WALRecord { /** */ private TxAction action; - /** */ + /** Global transaction identifier within cluster, assigned by transaction coordinator */ private GridCacheVersion nearXidVer; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java new file mode 100644 index 0000000..678539d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** + * Data Entry for automatic unwrapping key and value from Data Entry + */ +public class UnwrapDataEntry extends DataEntry { + /** Cache object value context. Context is used for unwrapping objects. */ + private final CacheObjectValueContext cacheObjValCtx; + + /** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */ + private boolean keepBinary; + + /** + * @param cacheId Cache ID. + * @param key Key. + * @param val Value. + * @param op Operation. + * @param nearXidVer Near transaction version. + * @param writeVer Write version. + * @param expireTime Expire time. + * @param partId Partition ID. + * @param partCnt Partition counter. + * @param cacheObjValCtx cache object value context for unwrapping objects. + * @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead + */ + public UnwrapDataEntry( + final int cacheId, + final KeyCacheObject key, + final CacheObject val, + final GridCacheOperation op, + final GridCacheVersion nearXidVer, + final GridCacheVersion writeVer, + final long expireTime, + final int partId, + final long partCnt, + final CacheObjectValueContext cacheObjValCtx, + final boolean keepBinary) { + super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt); + this.cacheObjValCtx = cacheObjValCtx; + this.keepBinary = keepBinary; + } + + /** + * Unwraps key value from cache key object into primitive boxed type or source class. If client classes were used + * in key, call of this method requires classes to be available in classpath + * + * @return Key which was placed into cache. Or null if failed + */ + public Object unwrappedKey() { + try { + if (keepBinary && key instanceof BinaryObject) + return key; + Object unwrapped = key.value(cacheObjValCtx, false); + if (unwrapped instanceof BinaryObject) { + if (keepBinary) + return unwrapped; + unwrapped = ((BinaryObject)unwrapped).deserialize(); + } + return unwrapped; + } + catch (Exception e) { + cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class) + .error("Unable to convert key [" + key + "]", e); + return null; + } + } + + /** + * Unwraps value value from cache value object into primitive boxed type or source class. If client classes were + * used in key, call of this method requires classes to be available in classpath + * + * @return Value which was placed into cache. Or null if failed + */ + public Object unwrappedValue() { + try { + if (keepBinary && val instanceof BinaryObject) + return val; + return val.value(cacheObjValCtx, false); + } + catch (Exception e) { + cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class) + .error("Unable to convert value [" + value() + "]", e); + return null; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass().getSimpleName() + "[k = " + unwrappedKey() + ", v = [ " + + unwrappedValue() + + "], super = [" + + super.toString() + "]]"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java index e682593..2d4114f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; /** * Class handles saving/restoring binary metadata to/from disk. @@ -49,8 +50,14 @@ class BinaryMetadataFileStore { * @param metadataLocCache Metadata locale cache. * @param ctx Context. * @param log Logger. + * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta and consistentId */ - BinaryMetadataFileStore(ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache, GridKernalContext ctx, IgniteLogger log) throws IgniteCheckedException { + BinaryMetadataFileStore( + final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache, + final GridKernalContext ctx, + final IgniteLogger log, + @Nullable final File binaryMetadataFileStoreDir) throws IgniteCheckedException { + this.metadataLocCache = metadataLocCache; this.ctx = ctx; this.log = log; @@ -58,14 +65,18 @@ class BinaryMetadataFileStore { if (!ctx.config().isPersistentStoreEnabled()) return; - String consId = U.maskForFileName(ctx.discovery().consistentId().toString()); - - workDir = new File(U.resolveWorkDirectory( - ctx.config().getWorkDirectory(), - "binary_meta", - false - ), - consId); + if (binaryMetadataFileStoreDir != null) + workDir = binaryMetadataFileStoreDir; + else { + String consId = U.maskForFileName(ctx.discovery().consistentId().toString()); + + workDir = new File(U.resolveWorkDirectory( + ctx.config().getWorkDirectory(), + "binary_meta", + false + ), + consId); + } U.ensureDirectory(workDir, "directory for serialized binary metadata", log); } http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 5b43f8f..0b85d2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.binary; +import java.io.File; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -109,6 +110,13 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** */ private BinaryMetadataFileStore metadataFileStore; + /** + * Custom folder specifying local folder for {@link #metadataFileStore}.<br> + * {@code null} means no specific folder is configured. <br> + * In this case folder for metadata is composed from work directory and consistentId <br> + */ + @Nullable private File binaryMetadataFileStoreDir; + /** */ @GridToStringExclude private IgniteBinary binaries; @@ -147,7 +155,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm if (ctx.clientNode()) ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED); - metadataFileStore = new BinaryMetadataFileStore(metadataLocCache, ctx, log); + metadataFileStore = new BinaryMetadataFileStore(metadataLocCache, ctx, log, binaryMetadataFileStoreDir); transport = new BinaryMetadataTransport(metadataLocCache, metadataFileStore, ctx, log); @@ -906,4 +914,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } } } + + /** + * Sets path to binary metadata store configured by user, should include binary_meta and consistentId + * @param binaryMetadataFileStoreDir path to binary_meta + */ + public void setBinaryMetadataFileStoreDir(@Nullable File binaryMetadataFileStoreDir) { + this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index db949c3..2749d5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -65,8 +65,11 @@ public abstract class AbstractWalRecordsIterator /** Logger */ @NotNull protected final IgniteLogger log; - /** Shared context for creating serializer of required version and grid name access */ - @NotNull private final GridCacheSharedContext sharedCtx; + /** + * Shared context for creating serializer of required version and grid name access. Also cacheObjects processor from + * this context may be used to covert Data entry key and value from its binary representation into objects. + */ + @NotNull protected final GridCacheSharedContext sharedCtx; /** Serializer of current version to read headers. */ @NotNull private final RecordSerializer serializer; @@ -78,10 +81,11 @@ public abstract class AbstractWalRecordsIterator private final ByteBufferExpander buf; /** - * @param log Logger - * @param sharedCtx Shared context + * @param log Logger. + * @param sharedCtx Shared context. * @param serializer Serializer of current version to read headers. - * @param bufSize buffer for reading records size + * @param ioFactory ioFactory for file IO access. + * @param bufSize buffer for reading records size. */ protected AbstractWalRecordsIterator( @NotNull final IgniteLogger log, @@ -210,7 +214,7 @@ public abstract class AbstractWalRecordsIterator ptr.length(rec.size()); // cast using diamond operator here can break compile for 7 - return new IgniteBiTuple<>((WALPointer)ptr, rec); + return new IgniteBiTuple<>((WALPointer)ptr, postProcessRecord(rec)); } catch (IOException | IgniteCheckedException e) { if (!(e instanceof SegmentEofException)) @@ -220,6 +224,17 @@ public abstract class AbstractWalRecordsIterator } /** + * Performs final conversions with record loaded from WAL. + * To be overridden by subclasses if any processing required. + * + * @param rec record to post process. + * @return post processed record. + */ + @NotNull protected WALRecord postProcessRecord(@NotNull final WALRecord rec) { + return rec; + } + + /** * Handler for record deserialization exception * @param e problem from records reading * @param ptr file pointer was accessed @@ -260,7 +275,7 @@ public abstract class AbstractWalRecordsIterator int ver = ((HeaderRecord)rec).version(); - RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver); + RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver, serializer.writePointer()); if (start != null && desc.idx == start.index()) in.seek(start.fileOffset()); http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index d2e85f1..18584a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -1010,12 +1010,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return Entry serializer. */ static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { + return forVersion(cctx, ver, false); + } + + /** + * @param ver Serializer version. + * @return Entry serializer. + */ + static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver, boolean writePointer) throws IgniteCheckedException { if (ver <= 0) throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file)."); switch (ver) { case 1: - return new RecordV1Serializer(cctx); + return new RecordV1Serializer(cctx, writePointer); default: throw new IgniteCheckedException("Failed to create a serializer with the given version " + http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java index 1ea7fa6..12e16a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java @@ -54,4 +54,9 @@ public interface RecordSerializer { * @return Read entry. */ public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; + + /** + * Flag to write (or not) wal pointer to record + */ + public boolean writePointer(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 4e3998b..3a34e28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -21,11 +21,13 @@ import java.io.File; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Factory for creating iterator over WAL files @@ -33,17 +35,90 @@ import org.jetbrains.annotations.NotNull; public class IgniteWalIteratorFactory { /** Logger. */ private final IgniteLogger log; - /** Page size, in standalone iterator mode this value can't be taken from memory configuration */ + + /** Page size, in standalone iterator mode this value can't be taken from memory configuration. */ private final int pageSize; + + /** + * Folder specifying location of metadata File Store. {@code null} means no specific folder is configured. <br> + * This folder should be specified for converting data entries into BinaryObjects + */ + @Nullable private File binaryMetadataFileStoreDir; + + /** + * Folder specifying location of marshaller mapping file store. {@code null} means no specific folder is configured. <br> + * This folder should be specified for converting data entries into BinaryObjects. + * Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects will be provided + */ + @Nullable private File marshallerMappingFileStoreDir; + + /** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */ + private boolean keepBinary; + /** Factory to provide I/O interfaces for read/write operations with files */ private final FileIOFactory ioFactory; /** - * Creates WAL files iterator factory + * Creates WAL files iterator factory. + * WAL iterator supports automatic converting from CacheObjects and KeyCacheObject into BinaryObjects + * * @param log Logger. - * @param pageSize Page size, size is validated + * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated + * according its boundaries. + * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. Should include "binary_meta" + * subfolder and consistent ID subfolder. Note Consistent ID should be already masked and should not contain special + * symbols. Providing {@code null} means no specific folder is configured. <br> + * @param marshallerMappingFileStoreDir Folder specifying location of marshaller mapping file store. Should include + * "marshaller" subfolder. Providing {@code null} will disable unmarshall for non primitive objects, + * BinaryObjects will be provided + * @param keepBinary {@code true} disables complex object unmarshall into source classes */ - public IgniteWalIteratorFactory(@NotNull IgniteLogger log, @NotNull FileIOFactory ioFactory, int pageSize) { + public IgniteWalIteratorFactory( + @NotNull final IgniteLogger log, + final int pageSize, + @Nullable final File binaryMetadataFileStoreDir, + @Nullable final File marshallerMappingFileStoreDir, + final boolean keepBinary) { + this.log = log; + this.pageSize = pageSize; + this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir; + this.marshallerMappingFileStoreDir = marshallerMappingFileStoreDir; + this.keepBinary = keepBinary; + this.ioFactory = new PersistentStoreConfiguration().getFileIOFactory(); + new MemoryConfiguration().setPageSize(pageSize); // just for validate + } + + /** + * Creates WAL files iterator factory. + * WAL iterator supports automatic converting from CacheObjects and KeyCacheObject into BinaryObjects + * + * @param log Logger. + * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated + * according its boundaries. + * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. Should include "binary_meta" + * subfolder and consistent ID subfolder. Note Consistent ID should be already masked and should not contain special + * symbols. Providing {@code null} means no specific folder is configured. <br> + * @param marshallerMappingFileStoreDir Folder specifying location of marshaller mapping file store. Should include + * "marshaller" subfolder. Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects + * will be provided + */ + public IgniteWalIteratorFactory( + @NotNull final IgniteLogger log, + final int pageSize, + @Nullable final File binaryMetadataFileStoreDir, + @Nullable final File marshallerMappingFileStoreDir) { + this(log, pageSize, binaryMetadataFileStoreDir, marshallerMappingFileStoreDir, false); + } + + /** + * Creates WAL files iterator factory. This constructor does not allow WAL iterators access to data entries key and value. + * + * @param log Logger. + * @param ioFactory Custom factory for non-standard file API to be used in WAL reading. + * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated + * according its boundaries. + */ + public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, @NotNull final FileIOFactory ioFactory, int pageSize) { this.log = log; this.pageSize = pageSize; this.ioFactory = ioFactory; @@ -51,48 +126,72 @@ public class IgniteWalIteratorFactory { } /** + * Creates WAL files iterator factory. This constructor does not allow WAL iterators access to data entries key and + * value. + * + * @param log Logger. + * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated + * according its boundaries. + */ + public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) { + this(log, new PersistentStoreConfiguration().getFileIOFactory(), pageSize); + } + + /** * Creates iterator for (archive) directory scan mode. * Note in this mode total scanned files at end of iteration may be wider that initial files in directory. * This mode does not support work directory scan because work directory contains unpredictable number in file name. * Such file may broke iteration. * - * @param walDirWithConsistentId directory with WAL files. Should already contain node consistent ID as subfolder + * @param walDirWithConsistentId directory with WAL files. Should already contain node consistent ID as subfolder. + * Note: 'Consistent ID'-based subfolder name (if any) should not contain special symbols. * @return closable WAL records iterator, should be closed when non needed * @throws IgniteCheckedException if failed to read folder */ - public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory); + public WALIterator iteratorArchiveDirectory( + @NotNull final File walDirWithConsistentId) throws IgniteCheckedException { + return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory, keepBinary); } /** * Creates iterator for file by file scan mode. * This method may be used only for archive folder (not for work). * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored - * @param files files to scan. Order it not important, but is significant to provide all segments without omissions + * + * @param files files to scan. Order is not important, but it is significant to provide all segments without omissions. + * Parameter should contain direct file links to '.wal' files from archive directory. + * 'Consistent ID'-based subfolder name (if any) should not contain special symbols. + * Special symbols should be already masked. + * * @return closable WAL records iterator, should be closed when non needed * @throws IgniteCheckedException if failed to read files */ - public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, false, files); + public WALIterator iteratorArchiveFiles(@NotNull final File... files) throws IgniteCheckedException { + return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, false, keepBinary, files); } /** * Creates iterator for file by file scan mode. * This method may be used for work folder, file indexes are scanned from the file context. * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored. - * @param files files to scan. Order it not important, but is significant to provide all segments without omissions + * + * @param files files to scan. Order is not important, but it is significant to provide all segments without omissions. + * Parameter should contain direct file links to '.wal' files from work directory. + * 'Consistent ID'-based subfolder name (if any) should not contain special symbols. + * Special symbols should be already masked. + * * @return closable WAL records iterator, should be closed when non needed * @throws IgniteCheckedException if failed to read files */ - public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, true, files); + public WALIterator iteratorWorkFiles(@NotNull final File... files) throws IgniteCheckedException { + return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, true, keepBinary, files); } /** * @return fake shared context required for create minimal services for record reading */ - @NotNull private GridCacheSharedContext prepareSharedCtx() { - final GridKernalContext kernalCtx = new StandaloneGridKernalContext(log); + @NotNull private GridCacheSharedContext prepareSharedCtx() throws IgniteCheckedException { + final GridKernalContext kernalCtx = new StandaloneGridKernalContext(log, binaryMetadataFileStoreDir, marshallerMappingFileStoreDir); final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(); http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 8752eaa..c2afdef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader; +import java.io.File; +import java.lang.reflect.Field; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -25,11 +27,14 @@ import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridKernalGateway; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -41,6 +46,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -73,6 +79,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.StripedExecutor; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; @@ -83,16 +90,36 @@ import org.jetbrains.annotations.Nullable; * Dummy grid kernal context */ public class StandaloneGridKernalContext implements GridKernalContext { - /** */ + /** Config for fake Ignite instance. */ + private final IgniteConfiguration cfg; + + /** Logger. */ private IgniteLogger log; - /** */ + /** Empty plugin processor. */ private IgnitePluginProcessor pluginProc; /** + * Cache object processor. Used for converting cache objects and keys into binary objects. Null means there is no + * convert is configured. All entries in this case will be lazy data entries. + */ + @Nullable private IgniteCacheObjectProcessor cacheObjProcessor; + + /** Marshaller context implementation. */ + private MarshallerContextImpl marshallerContext; + + /** * @param log Logger. + * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. + * {@code null} means no specific folder is configured. <br> + * + * @param marshallerMappingFileStoreDir folder specifying location of marshaller mapping file store. + * {@code null} means no specific folder is configured. + * Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects will be provided <br> */ - StandaloneGridKernalContext(IgniteLogger log) { + StandaloneGridKernalContext(IgniteLogger log, + @Nullable final File binaryMetadataFileStoreDir, + @Nullable final File marshallerMappingFileStoreDir) throws IgniteCheckedException { this.log = log; try { @@ -102,6 +129,56 @@ public class StandaloneGridKernalContext implements GridKernalContext { catch (IgniteCheckedException e) { throw new IllegalStateException("Must not fail on empty providers list.", e); } + + this.marshallerContext = new MarshallerContextImpl(null); + this.cfg = prepareIgniteConfiguration(); + this.cacheObjProcessor = binaryMetadataFileStoreDir != null ? binaryProcessor(this, binaryMetadataFileStoreDir) : null; + + if (marshallerMappingFileStoreDir != null) { + marshallerContext.setMarshallerMappingFileStoreDir(marshallerMappingFileStoreDir); + marshallerContext.onMarshallerProcessorStarted(this, null); + } + } + + /** + * Creates binary processor which allows to convert WAL records into objects + * + * @param ctx kernal context + * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store + * + * {@code null} means no specific folder is configured. <br> In this case folder for metadata is composed from work + * directory and consistentId + * @return Cache object processor able to restore data records content into binary objects + * @throws IgniteCheckedException Throws in case of initialization errors. + */ + private IgniteCacheObjectProcessor binaryProcessor( + final GridKernalContext ctx, + final File binaryMetadataFileStoreDir) throws IgniteCheckedException { + + final CacheObjectBinaryProcessorImpl processor = new CacheObjectBinaryProcessorImpl(ctx); + processor.setBinaryMetadataFileStoreDir(binaryMetadataFileStoreDir); + processor.start(); + return processor; + } + + /** + * @return Ignite configuration which allows to start requied processors for WAL reader + */ + private IgniteConfiguration prepareIgniteConfiguration() { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setDiscoverySpi(new StandaloneNoopDiscoverySpi()); + cfg.setCommunicationSpi(new StandaloneNoopCommunicationSpi()); + + final Marshaller marshaller = new BinaryMarshaller(); + cfg.setMarshaller(marshaller); + + PersistentStoreConfiguration pstCfg = new PersistentStoreConfiguration(); + cfg.setPersistentStoreConfiguration(pstCfg); + + marshaller.setContext(marshallerContext); + + return cfg; } /** {@inheritDoc} */ @@ -141,12 +218,21 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public IgniteEx grid() { - return null; + final IgniteEx kernal = new IgniteKernal(); + try { + Field fieldCfg = kernal.getClass().getDeclaredField("cfg"); + fieldCfg.setAccessible(true); + fieldCfg.set(kernal, cfg); + } + catch (NoSuchFieldException | IllegalAccessException e) { + log.error("", e); + } + return kernal; } /** {@inheritDoc} */ @Override public IgniteConfiguration config() { - return null; + return cfg; } /** {@inheritDoc} */ @@ -251,7 +337,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public PoolProcessor pools() { - return null; + return new PoolProcessor(this); } /** {@inheritDoc} */ @@ -276,7 +362,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public IgniteCacheObjectProcessor cacheObjects() { - return null; + return cacheObjProcessor; } /** {@inheritDoc} */ @@ -301,12 +387,16 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public GridIoManager io() { - return null; + return new GridIoManager(this); } /** {@inheritDoc} */ @Override public GridDiscoveryManager discovery() { - return null; + return new GridDiscoveryManager(StandaloneGridKernalContext.this) { + @Override public Object consistentId() { + return ""; // some non null value is required + } + }; } /** {@inheritDoc} */ @@ -350,7 +440,8 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ - @Override public void markSegmented() { } + @Override public void markSegmented() { + } /** {@inheritDoc} */ @Override public boolean segmented() { @@ -358,7 +449,8 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ - @Override public void printMemoryStats() { } + @Override public void printMemoryStats() { + } /** {@inheritDoc} */ @Override public boolean isDaemon() { @@ -487,7 +579,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public MarshallerContextImpl marshallerContext() { - return null; + return marshallerContext; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java index 838fc85..78588ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java @@ -25,14 +25,14 @@ import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.plugin.PluginProvider; /** - * + * No operation, empty plugin processor for creating WAL iterator without node start up */ -public class StandaloneIgnitePluginProcessor extends IgnitePluginProcessor { +class StandaloneIgnitePluginProcessor extends IgnitePluginProcessor { /** * @param ctx Kernal context. * @param cfg Ignite configuration. */ - public StandaloneIgnitePluginProcessor(GridKernalContext ctx, IgniteConfiguration cfg) throws IgniteCheckedException { + StandaloneIgnitePluginProcessor(GridKernalContext ctx, IgniteConfiguration cfg) throws IgniteCheckedException { super(ctx, cfg, Collections.<PluginProvider>emptyList()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopCommunicationSpi.java new file mode 100644 index 0000000..2029647 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopCommunicationSpi.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.io.Serializable; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiNoop; +import org.apache.ignite.spi.communication.CommunicationListener; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.jetbrains.annotations.Nullable; + +/** + * No-operation SPI for standalone WAL reader + */ +@IgniteSpiNoop +public class StandaloneNoopCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi { + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { + + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode destNode, Serializable msg) throws IgniteSpiException { + + } + + /** {@inheritDoc} */ + @Override public int getSentMessagesCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public long getSentBytesCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public int getReceivedMessagesCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public long getReceivedBytesCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public int getOutboundMessagesQueueSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable CommunicationListener lsnr) { + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java new file mode 100644 index 0000000..3946c4f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiNoop; +import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; +import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import org.jetbrains.annotations.Nullable; + +/** + * No-operation SPI for standalone WAL reader + */ +@IgniteSpiNoop +public class StandaloneNoopDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { + /** {@inheritDoc} */ + @Nullable @Override public Serializable consistentId() throws IgniteSpiException { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterNode getLocalNode() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return false; + } + + /** {@inheritDoc} */ + @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + + } + + /** {@inheritDoc} */ + @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { + + } + + /** {@inheritDoc} */ + @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { + + } + + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + + } + + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { + + } + + /** {@inheritDoc} */ + @Override public long getGridStartTime() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + + } + + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId, @Nullable String warning) { + + } + + /** {@inheritDoc} */ + @Override public boolean isClientMode() throws IllegalStateException { + return false; + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { + + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 5bbd7da..900aab5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -28,8 +28,16 @@ import java.util.Collections; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; +import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; @@ -39,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointe import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -77,49 +86,61 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { */ private boolean workDir; + /** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */ + private boolean keepBinary; + /** * Creates iterator in directory scan mode - * - * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder + * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder * @param log Logger. - * @param sharedCtx Shared context. + * @param sharedCtx Shared context. Cache processor is to be configured if Cache Object Key & Data Entry is + * required. * @param ioFactory File I/O factory. + * @param keepBinary Keep binary. This flag disables converting of non primitive types + * (BinaryObjects will be used instead) */ StandaloneWalRecordsIterator( - @NotNull File walFilesDir, - @NotNull IgniteLogger log, - @NotNull GridCacheSharedContext sharedCtx, - @NotNull FileIOFactory ioFactory) throws IgniteCheckedException { + @NotNull final File walFilesDir, + @NotNull final IgniteLogger log, + @NotNull final GridCacheSharedContext sharedCtx, + @NotNull final FileIOFactory ioFactory, + final boolean keepBinary) throws IgniteCheckedException { super(log, sharedCtx, new RecordV1Serializer(sharedCtx, true), ioFactory, BUF_SIZE); + this.keepBinary = keepBinary; init(walFilesDir, false, null); advance(); } /** * Creates iterator in file-by-file iteration mode. Directory - * - * @param log Logger. - * @param sharedCtx Shared context. + * @param log Logger. + * @param sharedCtx Shared context. Cache processor is to be configured if Cache Object Key & Data Entry is + * required. * @param ioFactory File I/O factory. * @param workDir Work directory is scanned, false - archive + * @param keepBinary Keep binary. This flag disables converting of non primitive types + * (BinaryObjects will be used instead) * @param walFiles Wal files. */ StandaloneWalRecordsIterator( - @NotNull IgniteLogger log, - @NotNull GridCacheSharedContext sharedCtx, - @NotNull FileIOFactory ioFactory, - boolean workDir, - @NotNull File... walFiles) throws IgniteCheckedException { + @NotNull final IgniteLogger log, + @NotNull final GridCacheSharedContext sharedCtx, + @NotNull final FileIOFactory ioFactory, + final boolean workDir, + final boolean keepBinary, + @NotNull final File... walFiles) throws IgniteCheckedException { super(log, sharedCtx, - new RecordV1Serializer(sharedCtx), + new RecordV1Serializer(sharedCtx, true), ioFactory, BUF_SIZE); + this.workDir = workDir; + this.keepBinary = keepBinary; init(null, workDir, walFiles); advance(); } @@ -244,6 +265,96 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { } /** {@inheritDoc} */ + @NotNull @Override protected WALRecord postProcessRecord(@NotNull final WALRecord rec) { + final GridKernalContext kernalCtx = sharedCtx.kernalContext(); + final IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects(); + + if (processor != null && rec.type() == WALRecord.RecordType.DATA_RECORD) { + try { + return postProcessDataRecord((DataRecord)rec, kernalCtx, processor); + } + catch (Exception e) { + log.error("Failed to perform post processing for data record ", e); + } + } + return super.postProcessRecord(rec); + } + + /** + * Performs post processing of lazy data record, converts it to unwrap record. + * + * @param dataRec data record to post process records. + * @param kernalCtx kernal context. + * @param processor processor to convert binary form from WAL into CacheObject/BinaryObject. + * @return post-processed record. + * @throws IgniteCheckedException if failed. + */ + @NotNull private WALRecord postProcessDataRecord( + @NotNull final DataRecord dataRec, + final GridKernalContext kernalCtx, + final IgniteCacheObjectProcessor processor) throws IgniteCheckedException { + final CacheObjectContext fakeCacheObjCtx = new CacheObjectContext(kernalCtx, + null, null, false, false, false); + + final List<DataEntry> entries = dataRec.writeEntries(); + final List<DataEntry> postProcessedEntries = new ArrayList<>(entries.size()); + + for (DataEntry dataEntry : entries) { + final DataEntry postProcessedEntry = postProcessDataEntry(processor, fakeCacheObjCtx, dataEntry); + + postProcessedEntries.add(postProcessedEntry); + } + return new DataRecord(postProcessedEntries); + } + + /** + * Converts entry or lazy data entry into unwrapped entry + * @param processor cache object processor for de-serializing objects. + * @param fakeCacheObjCtx cache object context for de-serializing binary and unwrapping objects. + * @param dataEntry entry to process + * @return post precessed entry + * @throws IgniteCheckedException if failed + */ + @NotNull + private DataEntry postProcessDataEntry( + final IgniteCacheObjectProcessor processor, + final CacheObjectContext fakeCacheObjCtx, + final DataEntry dataEntry) throws IgniteCheckedException { + + final KeyCacheObject key; + final CacheObject val; + final File marshallerMappingFileStoreDir = + fakeCacheObjCtx.kernalContext().marshallerContext().getMarshallerMappingFileStoreDir(); + + if (dataEntry instanceof LazyDataEntry) { + final LazyDataEntry lazyDataEntry = (LazyDataEntry)dataEntry; + key = processor.toKeyCacheObject(fakeCacheObjCtx, + lazyDataEntry.getKeyType(), + lazyDataEntry.getKeyBytes()); + val = processor.toCacheObject(fakeCacheObjCtx, + lazyDataEntry.getValType(), + lazyDataEntry.getValBytes()); + } + else { + key = dataEntry.key(); + val = dataEntry.value(); + } + + return new UnwrapDataEntry( + dataEntry.cacheId(), + key, + val, + dataEntry.op(), + dataEntry.nearXidVersion(), + dataEntry.writeVersion(), + dataEntry.expireTime(), + dataEntry.partitionId(), + dataEntry.partitionCounter(), + fakeCacheObjCtx, + keepBinary || marshallerMappingFileStoreDir == null); + } + + /** {@inheritDoc} */ @Override protected void handleRecordException( @NotNull final Exception e, @Nullable final FileWALPointer ptr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/39b90301/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index 91e1f00..b78e2e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -163,6 +163,11 @@ public class RecordV1Serializer implements RecordSerializer { } /** {@inheritDoc} */ + @Override public boolean writePointer() { + return writePointer; + } + + /** {@inheritDoc} */ @SuppressWarnings("CastConflictsWithInstanceof") @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { assert record.size() > 0 && buf.remaining() >= record.size() : record.size();
