http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index e49b7e2..55c5fb5 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.OffheapReadWriteLock; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.NotNull; @@ -190,6 +191,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** Page size. */ private final int sysPageSize; + /** Encrypted page size. */ + private final int encPageSize; + /** Shared context. */ private final GridCacheSharedContext<?, ?> ctx; @@ -316,6 +320,8 @@ public class PageMemoryImpl implements PageMemoryEx { sysPageSize = pageSize + PAGE_OVERHEAD; + encPageSize = CU.encryptedPageSize(pageSize, ctx.kernalContext().config().getEncryptionSpi()); + rwLock = new OffheapReadWriteLock(128); this.memMetrics = memMetrics; @@ -484,7 +490,8 @@ public class PageMemoryImpl implements PageMemoryEx { seg.writeLock().lock(); - boolean isTrackingPage = changeTracker != null && trackingIO.trackingPageFor(pageId, pageSize()) == pageId; + boolean isTrackingPage = + changeTracker != null && trackingIO.trackingPageFor(pageId, realPageSize(grpId)) == pageId; try { long relPtr = seg.loadedPages.get( @@ -526,9 +533,9 @@ public class PageMemoryImpl implements PageMemoryEx { // We are inside segment write lock, so no other thread can pin this tracking page yet. // We can modify page buffer directly. if (PageIO.getType(pageAddr) == 0) { - trackingIO.initNewPage(pageAddr, pageId, pageSize()); + trackingIO.initNewPage(pageAddr, pageId, realPageSize(grpId)); - if (!ctx.wal().disabled(fullId.groupId())) + if (!ctx.wal().disabled(fullId.groupId())) { if (!ctx.wal().isAlwaysWriteFullPages()) ctx.wal().log( new InitNewPageRecord( @@ -538,8 +545,11 @@ public class PageMemoryImpl implements PageMemoryEx { trackingIO.getVersion(), pageId ) ); - else - ctx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize())); + else { + ctx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize(), + realPageSize(fullId.groupId()))); + } + } } } @@ -947,6 +957,14 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ + @Override public int realPageSize(int grpId) { + if (ctx.kernalContext().encryption().groupKey(grpId) == null) + return pageSize(); + + return encPageSize; + } + + /** {@inheritDoc} */ @Override public boolean safeToUpdate() { if (segments != null) { for (Segment segment : segments) @@ -1627,7 +1645,7 @@ public class PageMemoryImpl implements PageMemoryEx { void beforeReleaseWrite(FullPageId pageId, long ptr, boolean pageWalRec) { if (walMgr != null && (pageWalRec || walMgr.isAlwaysWriteFullPages()) && !walMgr.disabled(pageId.groupId())) { try { - walMgr.log(new PageSnapshot(pageId, ptr, pageSize())); + walMgr.log(new PageSnapshot(pageId, ptr, pageSize(), realPageSize(pageId.groupId()))); } catch (IgniteCheckedException e) { // TODO ignite-db.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java index 5e1cb81..349e877 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java @@ -159,7 +159,7 @@ public abstract class BPlusIO<L> extends PageIO { /** * @param pageAddr Page address. - * @param pageSize Page size. + * @param pageSize Page size without encryption overhead. * @return Max items count. */ public abstract int getMaxCount(long pageAddr, int pageSize); @@ -331,7 +331,7 @@ public abstract class BPlusIO<L> extends PageIO { * @param leftPageAddr Left page address. * @param rightPageAddr Right page address. * @param emptyBranch We are merging an empty branch. - * @param pageSize Page size. + * @param pageSize Page size without encryption overhead. * @return {@code false} If we were not able to merge. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java index 22d2420..ee61e25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; @@ -293,7 +294,7 @@ public abstract class PageIO { } /** - * @param pageAddr Page addres. + * @param pageAddr Page address. * @return Page type. */ public static int getType(long pageAddr) { @@ -503,6 +504,8 @@ public abstract class PageIO { * @param pageAddr Page address. * @param pageId Page ID. * @param pageSize Page size. + * + * @see EncryptionSpi#encryptedSize(int) */ public void initNewPage(long pageAddr, long pageId, int pageSize) { setType(pageAddr, getType()); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionCountersIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionCountersIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionCountersIO.java index 68e6e2f..a3e92cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionCountersIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionCountersIO.java @@ -107,7 +107,7 @@ public class PagePartitionCountersIO extends PageIO { } /** - * @param pageSize Page size. + * @param pageSize Page size without encryption overhead. * @param pageAddr Page address. * @param cacheSizes Serialized cache size items (pairs of cache ID and its size). * @return Number of written pairs. http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index 98c6f1f..1135868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -23,8 +23,6 @@ import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageSupport; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; -import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; -import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; @@ -211,7 +209,7 @@ public abstract class PageHandler<X, R> { /** * @param pageMem Page memory. - * @param cacheId Cache ID. + * @param grpId Group ID. * @param pageId Page ID. * @param init IO for new page initialization. * @param wal Write ahead log. @@ -220,20 +218,20 @@ public abstract class PageHandler<X, R> { */ public static void initPage( PageMemory pageMem, - int cacheId, + int grpId, long pageId, PageIO init, IgniteWriteAheadLogManager wal, PageLockListener lsnr ) throws IgniteCheckedException { - Boolean res = writePage(pageMem, cacheId, pageId, lsnr, PageHandler.NO_OP, init, wal, null, null, 0, FALSE); + Boolean res = writePage(pageMem, grpId, pageId, lsnr, PageHandler.NO_OP, init, wal, null, null, 0, FALSE); assert res != FALSE; } /** * @param pageMem Page memory. - * @param cacheId Cache ID. + * @param grpId Group ID. * @param pageId Page ID. * @param lsnr Lock listener. * @param h Handler. @@ -248,7 +246,7 @@ public abstract class PageHandler<X, R> { */ public static <X, R> R writePage( PageMemory pageMem, - int cacheId, + int grpId, final long pageId, PageLockListener lsnr, PageHandler<X, R> h, @@ -260,9 +258,9 @@ public abstract class PageHandler<X, R> { R lockFailed ) throws IgniteCheckedException { boolean releaseAfterWrite = true; - long page = pageMem.acquirePage(cacheId, pageId); + long page = pageMem.acquirePage(grpId, pageId); try { - long pageAddr = writeLock(pageMem, cacheId, pageId, page, lsnr, false); + long pageAddr = writeLock(pageMem, grpId, pageId, page, lsnr, false); if (pageAddr == 0L) return lockFailed; @@ -272,13 +270,13 @@ public abstract class PageHandler<X, R> { try { if (init != null) { // It is a new page and we have to initialize it. - doInitPage(pageMem, cacheId, pageId, page, pageAddr, init, wal); + doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); walPlc = FALSE; } else init = PageIO.getPageIO(pageAddr); - R res = h.run(cacheId, pageId, page, pageAddr, init, walPlc, arg, intArg); + R res = h.run(grpId, pageId, page, pageAddr, init, walPlc, arg, intArg); ok = true; @@ -287,19 +285,19 @@ public abstract class PageHandler<X, R> { finally { assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 - if (releaseAfterWrite = h.releaseAfterWrite(cacheId, pageId, page, pageAddr, arg, intArg)) - writeUnlock(pageMem, cacheId, pageId, page, pageAddr, lsnr, walPlc, ok); + if (releaseAfterWrite = h.releaseAfterWrite(grpId, pageId, page, pageAddr, arg, intArg)) + writeUnlock(pageMem, grpId, pageId, page, pageAddr, lsnr, walPlc, ok); } } finally { if (releaseAfterWrite) - pageMem.releasePage(cacheId, pageId, page); + pageMem.releasePage(grpId, pageId, page); } } /** * @param pageMem Page memory. - * @param cacheId Cache ID. + * @param grpId Group ID. * @param pageId Page ID. * @param page Page pointer. * @param lsnr Lock listener. @@ -315,7 +313,7 @@ public abstract class PageHandler<X, R> { */ public static <X, R> R writePage( PageMemory pageMem, - int cacheId, + int grpId, long pageId, long page, PageLockListener lsnr, @@ -327,7 +325,7 @@ public abstract class PageHandler<X, R> { int intArg, R lockFailed ) throws IgniteCheckedException { - long pageAddr = writeLock(pageMem, cacheId, pageId, page, lsnr, false); + long pageAddr = writeLock(pageMem, grpId, pageId, page, lsnr, false); if (pageAddr == 0L) return lockFailed; @@ -337,13 +335,13 @@ public abstract class PageHandler<X, R> { try { if (init != null) { // It is a new page and we have to initialize it. - doInitPage(pageMem, cacheId, pageId, page, pageAddr, init, wal); + doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); walPlc = FALSE; } else init = PageIO.getPageIO(pageAddr); - R res = h.run(cacheId, pageId, page, pageAddr, init, walPlc, arg, intArg); + R res = h.run(grpId, pageId, page, pageAddr, init, walPlc, arg, intArg); ok = true; @@ -352,8 +350,8 @@ public abstract class PageHandler<X, R> { finally { assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 - if (h.releaseAfterWrite(cacheId, pageId, page, pageAddr, arg, intArg)) - writeUnlock(pageMem, cacheId, pageId, page, pageAddr, lsnr, walPlc, ok); + if (h.releaseAfterWrite(grpId, pageId, page, pageAddr, arg, intArg)) + writeUnlock(pageMem, grpId, pageId, page, pageAddr, lsnr, walPlc, ok); } } @@ -408,7 +406,7 @@ public abstract class PageHandler<X, R> { /** * @param pageMem Page memory. - * @param cacheId Cache ID. + * @param grpId Group ID. * @param pageId Page ID. * @param page Page pointer. * @param pageAddr Page address. @@ -418,7 +416,7 @@ public abstract class PageHandler<X, R> { */ private static void doInitPage( PageMemory pageMem, - int cacheId, + int grpId, long pageId, long page, long pageAddr, @@ -427,11 +425,11 @@ public abstract class PageHandler<X, R> { assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 - init.initNewPage(pageAddr, pageId, pageMem.pageSize()); + init.initNewPage(pageAddr, pageId, pageMem.realPageSize(grpId)); // Here we should never write full page, because it is known to be new. - if (isWalDeltaRecordNeeded(pageMem, cacheId, pageId, page, wal, FALSE)) - wal.log(new InitNewPageRecord(cacheId, pageId, + if (isWalDeltaRecordNeeded(pageMem, grpId, pageId, page, wal, FALSE)) + wal.log(new InitNewPageRecord(grpId, pageId, init.getType(), init.getVersion(), pageId)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 3f35c5f..6d379a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.failover.GridFailoverManager; import org.apache.ignite.internal.managers.indexing.GridIndexingManager; @@ -458,6 +459,11 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public GridEncryptionManager encryption() { + return null; + } + + /** {@inheritDoc} */ @Override public WorkersRegistry workersRegistry() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index c33a45b..25432d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRe import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer.EncryptedDataEntry; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; @@ -375,6 +376,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { final IgniteCacheObjectProcessor processor, final CacheObjectContext fakeCacheObjCtx, final DataEntry dataEntry) throws IgniteCheckedException { + if(dataEntry instanceof EncryptedDataEntry) + return dataEntry; final KeyCacheObject key; final CacheObject val; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index 752777a..2c8f03f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; import java.io.DataInput; import java.io.EOFException; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -28,17 +29,21 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.record.CacheState; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.EncryptedRecord; import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; +import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord; @@ -82,6 +87,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.SplitForwardPageRecor import org.apache.ignite.internal.pagemem.wal.record.delta.TrackingPageDeltaRecord; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -91,11 +97,23 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInputImpl; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.putRecordType; /** * Record data V1 serializer. @@ -107,15 +125,27 @@ public class RecordDataV1Serializer implements RecordDataSerializer { /** Cache shared context */ private final GridCacheSharedContext cctx; - /** Size of page used for PageMemory regions */ + /** Size of page used for PageMemory regions. */ private final int pageSize; + /** Size of page without encryption overhead. */ + private final int realPageSize; + /** Cache object processor to reading {@link DataEntry DataEntries} */ private final IgniteCacheObjectProcessor co; /** Serializer of {@link TxRecord} records. */ private TxRecordSerializer txRecordSerializer; + /** Encryption SPI instance. */ + private final EncryptionSpi encSpi; + + /** */ + private static final byte ENCRYPTED = 1; + + /** */ + private static final byte PLAIN = 0; + /** * @param cctx Cache shared context. */ @@ -124,10 +154,174 @@ public class RecordDataV1Serializer implements RecordDataSerializer { this.txRecordSerializer = new TxRecordSerializer(); this.co = cctx.kernalContext().cacheObjects(); this.pageSize = cctx.database().pageSize(); + this.encSpi = cctx.gridConfig().getEncryptionSpi(); + + //This happen on offline WAL iteration(we don't have encryption keys available). + if (encSpi != null) + this.realPageSize = CU.encryptedPageSize(pageSize, encSpi); + else + this.realPageSize = pageSize; } /** {@inheritDoc} */ @Override public int size(WALRecord record) throws IgniteCheckedException { + int clSz = plainSize(record); + + if (needEncryption(record)) + return encSpi.encryptedSize(clSz) + 4 /* groupId */ + 4 /* data size */ + REC_TYPE_SIZE; + + return clSz; + } + + /** {@inheritDoc} */ + @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in) + throws IOException, IgniteCheckedException { + if (type == ENCRYPTED_RECORD) { + if (encSpi == null) { + T2<Integer, RecordType> knownData = skipEncryptedRecord(in, true); + + //This happen on offline WAL iteration(we don't have encryption keys available). + return new EncryptedRecord(knownData.get1(), knownData.get2()); + } + + T3<ByteBufferBackedDataInput, Integer, RecordType> clData = readEncryptedData(in, true); + + //This happen during startup. On first WAL iteration we restore only metastore. + //So, no encryption keys available. See GridCacheDatabaseSharedManager#readMetastore + if (clData.get1() == null) + return new EncryptedRecord(clData.get2(), clData.get3()); + + return readPlainRecord(clData.get3(), clData.get1(), true); + } + + return readPlainRecord(type, in, false); + } + + /** {@inheritDoc} */ + @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException { + if (needEncryption(rec)) { + int clSz = plainSize(rec); + + ByteBuffer clData = ByteBuffer.allocate(clSz); + + writePlainRecord(rec, clData); + + clData.rewind(); + + writeEncryptedData(((WalRecordCacheGroupAware)rec).groupId(), rec.type(), clData, buf); + + return; + } + + writePlainRecord(rec, buf); + } + + /** + * @param rec Record to check. + * @return {@code True} if this record should be encrypted. + */ + private boolean needEncryption(WALRecord rec) { + if (!(rec instanceof WalRecordCacheGroupAware)) + return false; + + return needEncryption(((WalRecordCacheGroupAware)rec).groupId()); + } + + /** + * @param grpId Group id. + * @return {@code True} if this record should be encrypted. + */ + private boolean needEncryption(int grpId) { + return cctx.kernalContext().encryption().groupKey(grpId) != null; + } + + /** + * Reads and decrypt data from {@code in} stream. + * + * @param in Input stream. + * @param readType If {@code true} plain record type will be read from {@code in}. + * @return Plain data stream, group id, plain record type, + * @throws IOException If failed. + * @throws IgniteCheckedException If failed. + */ + private T3<ByteBufferBackedDataInput, Integer, RecordType> readEncryptedData(ByteBufferBackedDataInput in, + boolean readType) + throws IOException, IgniteCheckedException { + int grpId = in.readInt(); + int encRecSz = in.readInt(); + RecordType plainRecType = null; + + if (readType) + plainRecType = RecordV1Serializer.readRecordType(in); + + byte[] encData = new byte[encRecSz]; + + in.readFully(encData); + + Serializable key = cctx.kernalContext().encryption().groupKey(grpId); + + if (key == null) + return new T3<>(null, grpId, plainRecType); + + byte[] clData = encSpi.decrypt(encData, key); + + return new T3<>(new ByteBufferBackedDataInputImpl().buffer(ByteBuffer.wrap(clData)), grpId, plainRecType); + } + + /** + * Reads encrypted record without decryption. + * Should be used only for a offline WAL iteration. + * + * @param in Data stream. + * @param readType If {@code true} plain record type will be read from {@code in}. + * @return Group id and type of skipped record. + */ + private T2<Integer, RecordType> skipEncryptedRecord(ByteBufferBackedDataInput in, boolean readType) + throws IOException, IgniteCheckedException { + int grpId = in.readInt(); + int encRecSz = in.readInt(); + RecordType plainRecType = null; + + if (readType) + plainRecType = RecordV1Serializer.readRecordType(in); + + int skipped = in.skipBytes(encRecSz); + + assert skipped == encRecSz; + + return new T2<>(grpId, plainRecType); + } + + /** + * Writes encrypted {@code clData} to {@code dst} stream. + * + * @param grpId Group id; + * @param plainRecType Plain record type + * @param clData Plain data. + * @param dst Destination buffer. + */ + private void writeEncryptedData(int grpId, @Nullable RecordType plainRecType, ByteBuffer clData, ByteBuffer dst) { + int dtSz = encSpi.encryptedSize(clData.capacity()); + + dst.putInt(grpId); + dst.putInt(dtSz); + + if (plainRecType != null) + putRecordType(dst, plainRecType); + + Serializable key = cctx.kernalContext().encryption().groupKey(grpId); + + assert key != null; + + encSpi.encrypt(clData, key, dst); + } + + /** + * @param record Record to measure. + * @return Plain(without encryption) size of serialized rec in bytes. + * @throws IgniteCheckedException If failed. + */ + int plainSize(WALRecord record) throws IgniteCheckedException { switch (record.type()) { case PAGE_RECORD: assert record instanceof PageSnapshot; @@ -313,8 +507,19 @@ public class RecordDataV1Serializer implements RecordDataSerializer { } } - /** {@inheritDoc} */ - @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + /** + * Reads {@code WalRecord} of {@code type} from input. + * Input should be plain(not encrypted). + * + * @param type Record type. + * @param in Input + * @param encrypted Record was encrypted. + * @return Deserialized record. + * @throws IOException If failed. + * @throws IgniteCheckedException If failed. + */ + WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, + boolean encrypted) throws IOException, IgniteCheckedException { WALRecord res; switch (type) { @@ -326,7 +531,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { in.readFully(arr); - res = new PageSnapshot(new FullPageId(pageId, cacheId), arr); + res = new PageSnapshot(new FullPageId(pageId, cacheId), arr, encrypted ? realPageSize : pageSize); break; @@ -401,7 +606,19 @@ public class RecordDataV1Serializer implements RecordDataSerializer { List<DataEntry> entries = new ArrayList<>(entryCnt); for (int i = 0; i < entryCnt; i++) - entries.add(readDataEntry(in)); + entries.add(readPlainDataEntry(in)); + + res = new DataRecord(entries, 0L); + + break; + + case ENCRYPTED_DATA_RECORD: + entryCnt = in.readInt(); + + entries = new ArrayList<>(entryCnt); + + for (int i = 0; i < entryCnt; i++) + entries.add(readEncryptedDataEntry(in)); res = new DataRecord(entries, 0L); @@ -911,8 +1128,14 @@ public class RecordDataV1Serializer implements RecordDataSerializer { return res; } - /** {@inheritDoc} */ - @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException { + /** + * Write {@code rec} to {@code buf} without encryption. + * + * @param rec Record to serialize. + * @param buf Output buffer. + * @throws IgniteCheckedException If failed. + */ + void writePlainRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException { switch (rec.type()) { case PAGE_RECORD: PageSnapshot snap = (PageSnapshot)rec; @@ -997,8 +1220,14 @@ public class RecordDataV1Serializer implements RecordDataSerializer { buf.putInt(dataRec.writeEntries().size()); - for (DataEntry dataEntry : dataRec.writeEntries()) - putDataEntry(buf, dataEntry); + boolean encrypted = isDataRecordEncrypted(dataRec); + + for (DataEntry dataEntry : dataRec.writeEntries()) { + if (encrypted) + putEncryptedDataEntry(buf, dataEntry); + else + putPlainDataEntry(buf, dataEntry); + } break; @@ -1482,8 +1711,36 @@ public class RecordDataV1Serializer implements RecordDataSerializer { /** * @param buf Buffer to write to. * @param entry Data entry. + * @throws IgniteCheckedException If failed. + */ + void putEncryptedDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException { + DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(entry.cacheId()); + + if (desc != null && needEncryption(desc.groupId())) { + int clSz = entrySize(entry); + + ByteBuffer clData = ByteBuffer.allocate(clSz); + + putPlainDataEntry(clData, entry); + + clData.rewind(); + + buf.put(ENCRYPTED); + + writeEncryptedData(desc.groupId(), null, clData, buf); + } + else { + buf.put(PLAIN); + + putPlainDataEntry(buf, entry); + } + } + + /** + * @param buf Buffer to write to. + * @param entry Data entry. */ - static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException { + void putPlainDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException { buf.putInt(entry.cacheId()); if (!entry.key().putValue(buf)) @@ -1550,8 +1807,35 @@ public class RecordDataV1Serializer implements RecordDataSerializer { /** * @param in Input to read from. * @return Read entry. + * @throws IOException If failed. + * @throws IgniteCheckedException If failed. + */ + DataEntry readEncryptedDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + boolean needDecryption = in.readByte() == ENCRYPTED; + + if (needDecryption) { + if (encSpi == null) { + skipEncryptedRecord(in, false); + + return new EncryptedDataEntry(); + } + + T3<ByteBufferBackedDataInput, Integer, RecordType> clData = readEncryptedData(in, false); + + if (clData.get1() == null) + return null; + + return readPlainDataEntry(clData.get1()); + } + + return readPlainDataEntry(in); + } + + /** + * @param in Input to read from. + * @return Read entry. */ - DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + DataEntry readPlainDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { int cacheId = in.readInt(); int keySize = in.readInt(); @@ -1622,6 +1906,33 @@ public class RecordDataV1Serializer implements RecordDataSerializer { } /** + * @param rec Record. + * @return Real record type. + */ + RecordType recordType(WALRecord rec) { + if (needEncryption(rec)) + return ENCRYPTED_RECORD; + + if (rec.type() != DATA_RECORD) + return rec.type(); + + return isDataRecordEncrypted((DataRecord)rec) ? ENCRYPTED_DATA_RECORD : DATA_RECORD; + } + + /** + * @param rec Data record. + * @return {@code True} if this data record should be encrypted. + */ + boolean isDataRecordEncrypted(DataRecord rec) { + for (DataEntry e : rec.writeEntries()) { + if(needEncryption(cctx.cacheContext(e.cacheId()).groupId())) + return true; + } + + return false; + } + + /** * @param buf Buffer to read from. * @return Read map. */ @@ -1683,10 +1994,22 @@ public class RecordDataV1Serializer implements RecordDataSerializer { * @throws IgniteCheckedException If failed to obtain the length of one of the entries. */ private int dataSize(DataRecord dataRec) throws IgniteCheckedException { + boolean encrypted = isDataRecordEncrypted(dataRec); + int sz = 0; - for (DataEntry entry : dataRec.writeEntries()) - sz += entrySize(entry); + for (DataEntry entry : dataRec.writeEntries()) { + int clSz = entrySize(entry); + + if (needEncryption(cctx.cacheContext(entry.cacheId()).groupId())) + sz += encSpi.encryptedSize(clSz) + 1 /* encrypted flag */ + 4 /* groupId */ + 4 /* data size */; + else { + sz += clSz; + + if (encrypted) + sz += 1 /* encrypted flag */; + } + } return sz; } @@ -1735,4 +2058,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer { return size; } + + public static class EncryptedDataEntry extends DataEntry { + EncryptedDataEntry() { + super(0, null, null, READ, null, null, 0, 0, 0); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java index b760547..5f29dd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -35,6 +35,8 @@ import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; @@ -42,12 +44,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.record.Header /** * Record data V2 serializer. */ -public class RecordDataV2Serializer implements RecordDataSerializer { +public class RecordDataV2Serializer extends RecordDataV1Serializer implements RecordDataSerializer { /** Length of HEADER record data. */ - static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4; - - /** V1 data serializer delegate. */ - private final RecordDataV1Serializer delegateSerializer; + private static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4; /** Serializer of {@link TxRecord} records. */ private final TxRecordSerializer txRecordSerializer; @@ -55,15 +54,16 @@ public class RecordDataV2Serializer implements RecordDataSerializer { /** * Create an instance of V2 data serializer. * - * @param delegateSerializer V1 data serializer. + * @param cctx Cache shared context. */ - public RecordDataV2Serializer(RecordDataV1Serializer delegateSerializer) { - this.delegateSerializer = delegateSerializer; + public RecordDataV2Serializer(GridCacheSharedContext cctx) { + super(cctx); + this.txRecordSerializer = new TxRecordSerializer(); } /** {@inheritDoc} */ - @Override public int size(WALRecord rec) throws IgniteCheckedException { + @Override protected int plainSize(WALRecord rec) throws IgniteCheckedException { switch (rec.type()) { case HEADER_RECORD: return HEADER_RECORD_DATA_SIZE; @@ -81,7 +81,7 @@ public class RecordDataV2Serializer implements RecordDataSerializer { return 18 + cacheStatesSize + (walPtr == null ? 0 : 16); case DATA_RECORD: - return delegateSerializer.size(rec) + 8/*timestamp*/; + return super.plainSize(rec) + 8/*timestamp*/; case SNAPSHOT: return 8 + 1; @@ -93,14 +93,15 @@ public class RecordDataV2Serializer implements RecordDataSerializer { return txRecordSerializer.size((TxRecord)rec); default: - return delegateSerializer.size(rec); + return super.plainSize(rec); } } /** {@inheritDoc} */ - @Override public WALRecord readRecord( - WALRecord.RecordType type, - ByteBufferBackedDataInput in + @Override WALRecord readPlainRecord( + RecordType type, + ByteBufferBackedDataInput in, + boolean encrypted ) throws IOException, IgniteCheckedException { switch (type) { case CHECKPOINT_RECORD: @@ -130,9 +131,21 @@ public class RecordDataV2Serializer implements RecordDataSerializer { List<DataEntry> entries = new ArrayList<>(entryCnt); for (int i = 0; i < entryCnt; i++) - entries.add(delegateSerializer.readDataEntry(in)); + entries.add(readPlainDataEntry(in)); + + return new DataRecord(entries, timeStamp); + + case ENCRYPTED_DATA_RECORD: + entryCnt = in.readInt(); + timeStamp = in.readLong(); + + entries = new ArrayList<>(entryCnt); + + for (int i = 0; i < entryCnt; i++) + entries.add(readEncryptedDataEntry(in)); return new DataRecord(entries, timeStamp); + case SNAPSHOT: long snpId = in.readLong(); byte full = in.readByte(); @@ -150,13 +163,12 @@ public class RecordDataV2Serializer implements RecordDataSerializer { return txRecordSerializer.read(in); default: - return delegateSerializer.readRecord(type, in); + return super.readPlainRecord(type, in, encrypted); } - } /** {@inheritDoc} */ - @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException { + @Override protected void writePlainRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException { if (rec instanceof HeaderRecord) throw new UnsupportedOperationException("Writing header records is forbidden since version 2 of serializer"); @@ -193,8 +205,14 @@ public class RecordDataV2Serializer implements RecordDataSerializer { buf.putInt(dataRec.writeEntries().size()); buf.putLong(dataRec.timestamp()); - for (DataEntry dataEntry : dataRec.writeEntries()) - RecordDataV1Serializer.putDataEntry(buf, dataEntry); + boolean encrypted = isDataRecordEncrypted(dataRec); + + for (DataEntry dataEntry : dataRec.writeEntries()) { + if (encrypted) + putEncryptedDataEntry(buf, dataEntry); + else + putPlainDataEntry(buf, dataEntry); + } break; @@ -221,7 +239,7 @@ public class RecordDataV2Serializer implements RecordDataSerializer { break; default: - delegateSerializer.writeRecord(rec, buf); + super.writePlainRecord(rec, buf); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java index 2e2e2f8..c149817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java @@ -77,11 +77,8 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { recordDeserializeFilter); case 2: - RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer( - new RecordDataV1Serializer(cctx)); - return new RecordV2Serializer( - dataV2Serializer, + new RecordDataV2Serializer(cctx), needWritePointer, marshalledMode, skipPositionCheck, http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index afd770d..c65f37c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -170,7 +170,7 @@ public class RecordV1Serializer implements RecordSerializer { /** {@inheritDoc} */ @Override public void writeWithHeaders(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException { // Write record type. - putRecordType(buf, rec); + putRecordType(buf, dataSerializer.recordType(rec)); // SWITCH_SEGMENT_RECORD should have only type, no need to write pointer. if (rec.type() == SWITCH_SEGMENT_RECORD) @@ -326,10 +326,10 @@ public class RecordV1Serializer implements RecordSerializer { * Writes record type to given {@code buf}. * * @param buf Buffer to write record type. - * @param rec WAL record. + * @param type WAL record type. */ - static void putRecordType(ByteBuffer buf, WALRecord rec) { - buf.put((byte)(rec.type().ordinal() + 1)); + static void putRecordType(ByteBuffer buf, RecordType type) { + buf.put((byte)(type.ordinal() + 1)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index e112522..d27a331 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -174,7 +174,7 @@ public class RecordV2Serializer implements RecordSerializer { ByteBuffer buf ) throws IgniteCheckedException { // Write record type. - RecordV1Serializer.putRecordType(buf, record); + RecordV1Serializer.putRecordType(buf, dataSerializer.recordType(record)); // SWITCH_SEGMENT_RECORD should have only type, no need to write pointer. if (record.type() == SWITCH_SEGMENT_RECORD) http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java index 81855fc..51a65bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -70,6 +70,9 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { * @param initiatingNodeId Node initiated state change. * @param storedCfgs Configurations read from persistent store. * @param activate New cluster state. + * @param baselineTopology Baseline topology. + * @param forceChangeBaselineTopology Force change baseline topology flag. + * @param timestamp Timestamp. */ public ChangeGlobalStateMessage( UUID reqId, @@ -78,8 +81,7 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { boolean activate, BaselineTopology baselineTopology, boolean forceChangeBaselineTopology, - long timestamp - ) { + long timestamp) { assert reqId != null; assert initiatingNodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index c4a3126..e933757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -1160,6 +1160,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I ctx.task().onActivate(ctx); + ctx.encryption().onActivate(ctx); + if (log.isInfoEnabled()) log.info("Successfully performed final activation steps [nodeId=" + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 8e66102..25b9cb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -69,6 +69,7 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.SqlConnectorConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.events.Event; import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.NoOpFailureHandler; @@ -99,6 +100,8 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; +import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; import org.apache.ignite.spi.eventstorage.EventStorageSpi; import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; @@ -220,6 +223,7 @@ public class PlatformConfigurationUtils { ccfg.setQueryDetailMetricsSize(in.readInt()); ccfg.setQueryParallelism(in.readInt()); ccfg.setSqlSchema(in.readString()); + ccfg.setEncryptionEnabled(in.readBoolean()); int qryEntCnt = in.readInt(); @@ -702,6 +706,7 @@ public class PlatformConfigurationUtils { readCacheConfigurations(in, cfg, ver); readDiscoveryConfiguration(in, cfg); + readEncryptionConfiguration(in, cfg, ver); if (in.readBoolean()) { TcpCommunicationSpi comm = new TcpCommunicationSpi(); @@ -943,6 +948,30 @@ public class PlatformConfigurationUtils { } /** + * Reads encryption configuration + * @param in Reader. + * @param cfg Configuration. + * @param ver Client version. + */ + private static void readEncryptionConfiguration(BinaryRawReaderEx in, IgniteConfiguration cfg, + ClientListenerProtocolVersion ver) { + if (ver.compareTo(VER_1_2_0) < 0 || !in.readBoolean()) { + cfg.setEncryptionSpi(new NoopEncryptionSpi()); + + return; + } + + KeystoreEncryptionSpi enc = new KeystoreEncryptionSpi(); + + enc.setMasterKeyName(in.readString()); + enc.setKeySize(in.readInt()); + enc.setKeyStorePath(in.readString()); + enc.setKeyStorePassword(in.readCharArray()); + + cfg.setEncryptionSpi(enc); + } + + /** * Writes cache configuration. * * @param writer Writer. @@ -1003,6 +1032,7 @@ public class PlatformConfigurationUtils { writer.writeInt(ccfg.getQueryDetailMetricsSize()); writer.writeInt(ccfg.getQueryParallelism()); writer.writeString(ccfg.getSqlSchema()); + writer.writeBoolean(ccfg.isEncryptionEnabled()); Collection<QueryEntity> qryEntities = ccfg.getQueryEntities(); @@ -1253,6 +1283,7 @@ public class PlatformConfigurationUtils { w.writeInt(0); writeDiscoveryConfiguration(w, cfg.getDiscoverySpi()); + writeEncryptionConfiguration(w, cfg.getEncryptionSpi(), ver); CommunicationSpi comm = cfg.getCommunicationSpi(); @@ -1462,6 +1493,34 @@ public class PlatformConfigurationUtils { } /** + * Writes encryption configuration. + * + * @param w Writer. + * @param enc Encryption Spi. + * @param ver Client version. + */ + private static void writeEncryptionConfiguration(BinaryRawWriter w, EncryptionSpi enc, + ClientListenerProtocolVersion ver) { + if (ver.compareTo(VER_1_2_0) < 0) + return; + + if (enc instanceof NoopEncryptionSpi) { + w.writeBoolean(false); + + return; + } + + KeystoreEncryptionSpi keystoreEnc = (KeystoreEncryptionSpi)enc; + + w.writeBoolean(true); + + w.writeString(keystoreEnc.getMasterKeyName()); + w.writeInt(keystoreEnc.getKeySize()); + w.writeString(keystoreEnc.getKeyStorePath()); + w.writeCharArray(keystoreEnc.getKeyStorePwd()); + } + + /** * Writes enum as byte. * * @param w Writer. http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index eb3f2a7..b5b104d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -1484,13 +1484,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param writeSyncMode Write synchronization mode. * @param backups Backups. * @param ifNotExists Quietly ignore this command if table already exists. + * @param encrypted Encrypted flag. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") public void dynamicTableCreate(String schemaName, QueryEntity entity, String templateName, String cacheName, String cacheGroup, @Nullable String dataRegion, String affinityKey, @Nullable CacheAtomicityMode atomicityMode, - @Nullable CacheWriteSynchronizationMode writeSyncMode, @Nullable Integer backups, boolean ifNotExists) - throws IgniteCheckedException { + @Nullable CacheWriteSynchronizationMode writeSyncMode, @Nullable Integer backups, boolean ifNotExists, + boolean encrypted) throws IgniteCheckedException { assert !F.isEmpty(templateName); assert backups == null || backups >= 0; @@ -1534,6 +1535,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (backups != null) ccfg.setBackups(backups); + ccfg.setEncryptionEnabled(encrypted); ccfg.setSqlSchema(schemaName); ccfg.setSqlEscapeAll(true); ccfg.setQueryEntities(Collections.singleton(entity)); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index b8ba742..3ffbb00 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -32,10 +32,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.ObjectInput; +import java.io.ObjectInputStream; import java.io.ObjectOutput; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.PrintStream; import java.io.Reader; +import java.io.Serializable; import java.io.StringWriter; import java.io.Writer; import java.lang.annotation.Annotation; @@ -107,6 +110,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Random; import java.util.ServiceLoader; import java.util.Set; import java.util.StringTokenizer; @@ -477,6 +481,9 @@ public abstract class IgniteUtils { /** Ignite Work Directory. */ public static final String IGNITE_WORK_DIR = System.getenv(IgniteSystemProperties.IGNITE_WORK_DIR); + /** Random is used to get random server node to authentication from client node. */ + private static final Random RND = new Random(System.currentTimeMillis()); + /** Clock timer. */ private static Thread timer; @@ -10339,6 +10346,43 @@ public abstract class IgniteUtils { } /** + * Serialize object to byte array. + * + * @param obj Object. + * @return Serialized object. + */ + public static byte[] toBytes(Serializable obj) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos)) { + + oos.writeObject(obj); + oos.flush(); + + return bos.toByteArray(); + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + /** + * Deserialize object from byte array. + * + * @param data Serialized object. + * @return Object. + */ + public static <T> T fromBytes(byte[] data) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bis)) { + + return (T)ois.readObject(); + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteException(e); + } + } + + /** * Return count of regular file in the directory (including in sub-directories) * * @param dir path to directory @@ -10580,6 +10624,27 @@ public abstract class IgniteUtils { } /** + * @param ctx Kernel context. + * @return Random alive server node. + */ + public static ClusterNode randomServerNode(GridKernalContext ctx) { + Collection<ClusterNode> aliveNodes = ctx.discovery().aliveServerNodes(); + + int rndIdx = RND.nextInt(aliveNodes.size()) + 1; + + int i = 0; + ClusterNode rndNode = null; + + for (Iterator<ClusterNode> it = aliveNodes.iterator(); i < rndIdx && it.hasNext(); i++) + rndNode = it.next(); + + if (rndNode == null) + assert rndNode != null; + + return rndNode; + } + + /** * */ public static class ReentrantReadWriteLockTracer implements ReadWriteLock { http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index f9c4d9d..ce5076b 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -298,13 +298,13 @@ public class GridFunc { * @param delim Delimiter (optional). * @return Concatenated string. */ - public static String concat(Iterable<String> c, @Nullable String delim) { + public static String concat(Iterable<?> c, @Nullable String delim) { A.notNull(c, "c"); IgniteReducer<? super String, String> f = new StringConcatReducer(delim); - for (String x : c) - if (!f.collect(x)) + for (Object x : c) + if (!f.collect(x == null ? null : x.toString())) break; return f.reduce(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java new file mode 100644 index 0000000..693cefd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.encryption; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.spi.IgniteSpi; + +/** + * SPI provides encryption features for an Ignite. + */ +public interface EncryptionSpi extends IgniteSpi { + /** + * Returns master key digest. + * Should always return same digest for a same key. + * Digest used for a configuration consistency check. + * + * @return Master key digest. + */ + byte[] masterKeyDigest(); + + /** + * Creates new key for an encryption/decryption of cache persistent data: pages, WAL records. + * + * @return Newly created encryption key. + * @throws IgniteException If key creation failed. + */ + Serializable create() throws IgniteException; + + /** + * Encrypts data. + * + * @param data Data to encrypt. + * @param key Encryption key. + * @param res Destination buffer. + */ + void encrypt(ByteBuffer data, Serializable key, ByteBuffer res); + + /** + * Encrypts data without padding info. + * + * @param data Data to encrypt. + * @param key Encryption key. + * @param res Destination buffer. + */ + void encryptNoPadding(ByteBuffer data, Serializable key, ByteBuffer res); + + /** + * Decrypts data encrypted with {@link #encrypt(ByteBuffer, Serializable, ByteBuffer)} + * + * @param data Data to decrypt. + * @param key Encryption key. + */ + byte[] decrypt(byte[] data, Serializable key); + + /** + * Decrypts data encrypted with {@link #encryptNoPadding(ByteBuffer, Serializable, ByteBuffer)} + * + * @param data Data to decrypt. + * @param key Encryption key. + */ + void decryptNoPadding(ByteBuffer data, Serializable key, ByteBuffer res); + + /** + * Encrypts key. + * Adds some info to check key integrity on decryption. + * + * @param key Key to encrypt. + * @return Encrypted key. + */ + byte[] encryptKey(Serializable key); + + /** + * Decrypts key and checks it integrity. + * + * @param key Key to decrypt. + * @return Encrypted key. + */ + Serializable decryptKey(byte[] key); + + /** + * @param dataSize Size of plain data in bytes. + * @return Size of encrypted data in bytes for padding encryption mode. + */ + int encryptedSize(int dataSize); + + /** + * @param dataSize Size of plain data in bytes. + * @return Size of encrypted data in bytes for nopadding encryption mode. + */ + int encryptedSizeNoPadding(int dataSize); + + /** + * @return Encrypted data block size. + */ + int blockSize(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionKey.java new file mode 100644 index 0000000..c2577c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionKey.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.encryption.keystore; + +import java.io.Serializable; +import java.security.Key; +import java.util.Arrays; +import java.util.Objects; +import org.jetbrains.annotations.Nullable; + +/** + * {@code EncryptionKey} implementation based on java security. + * + * @see Key + * @see KeystoreEncryptionSpi + */ +public final class KeystoreEncryptionKey implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Encryption key. + */ + private final Key k; + + /** + * Key digest. + */ + @Nullable final byte[] digest; + + /** + * @param k Encryption key. + * @param digest Message digest. + */ + KeystoreEncryptionKey(Key k, @Nullable byte[] digest) { + this.k = k; + this.digest = digest; + } + + /** + * Encryption key. + */ + public Key key() { + return k; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + KeystoreEncryptionKey key = (KeystoreEncryptionKey)o; + + return Objects.equals(k, key.k) && + Arrays.equals(digest, key.digest); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = Objects.hash(k); + + result = 31 * result + Arrays.hashCode(digest); + + return result; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ae9c180/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java new file mode 100644 index 0000000..beba015 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.encryption.keystore; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.net.URL; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.KeyStore; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.KeyGenerator; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.SecretKey; +import javax.crypto.ShortBufferException; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.spi.encryption.EncryptionSpi; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; +import org.jetbrains.annotations.Nullable; + +import static javax.crypto.Cipher.DECRYPT_MODE; +import static javax.crypto.Cipher.ENCRYPT_MODE; + +/** + * EncryptionSPI implementation base on JDK provided cipher algorithm implementations. + * + * @see EncryptionSpi + * @see KeystoreEncryptionKey + */ +public class KeystoreEncryptionSpi extends IgniteSpiAdapter implements EncryptionSpi { + /** + * Default key store entry name to store Encryption master key. + */ + public static final String DEFAULT_MASTER_KEY_NAME = "ignite.master.key"; + + /** + * Algorithm supported by implementation. + */ + public static final String CIPHER_ALGO = "AES"; + + /** + * Default encryption key size; + */ + public static final int DEFAULT_KEY_SIZE = 256; + + /** + * Full name of cipher algorithm. + */ + private static final String AES_WITH_PADDING = "AES/CBC/PKCS5Padding"; + + /** + * Full name of cipher algorithm without padding. + */ + private static final String AES_WITHOUT_PADDING = "AES/CBC/NoPadding"; + + /** + * Algorithm used for digest calculation. + */ + private static final String DIGEST_ALGO = "SHA-512"; + + /** + * Data block size. + */ + private static final int BLOCK_SZ = 16; + + /** + * Path to master key store. + */ + private String keyStorePath; + + /** + * Key store password. + */ + private char[] keyStorePwd; + + /** + * Key size. + */ + private int keySize = DEFAULT_KEY_SIZE; + + /** + * Master key name. + */ + private String masterKeyName = DEFAULT_MASTER_KEY_NAME; + + /** + * Master key. + */ + private KeystoreEncryptionKey masterKey; + + /** Logger. */ + @LoggerResource + protected IgniteLogger log; + + /** Ignite */ + @IgniteInstanceResource + protected Ignite ignite; + + /** */ + private ThreadLocal<Cipher> aesWithPadding = ThreadLocal.withInitial(() -> { + try { + return Cipher.getInstance(AES_WITH_PADDING); + } + catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new IgniteException(e); + } + }); + + /** */ + private ThreadLocal<Cipher> aesWithoutPadding = ThreadLocal.withInitial(() -> { + try { + return Cipher.getInstance(AES_WITHOUT_PADDING); + } + catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new IgniteException(e); + } + }); + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { + assertParameter(!F.isEmpty(keyStorePath), "KeyStorePath shouldn't be empty"); + assertParameter(keyStorePwd != null && keyStorePwd.length > 0, + "KeyStorePassword shouldn't be empty"); + + try (InputStream keyStoreFile = keyStoreFile()) { + assertParameter(keyStoreFile != null, keyStorePath + " doesn't exists!"); + + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + + ks.load(keyStoreFile, keyStorePwd); + + if (log != null) + log.info("Successfully load keyStore [path=" + keyStorePath + "]"); + + masterKey = new KeystoreEncryptionKey(ks.getKey(masterKeyName, keyStorePwd), null); + } + catch (GeneralSecurityException | IOException e) { + throw new IgniteSpiException(e); + } + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + ensureStarted(); + + //empty. + } + + /** {@inheritDoc} */ + @Override public byte[] masterKeyDigest() { + ensureStarted(); + + return makeDigest(masterKey.key().getEncoded()); + } + + /** {@inheritDoc} */ + @Override public KeystoreEncryptionKey create() throws IgniteException { + ensureStarted(); + + try { + KeyGenerator gen = KeyGenerator.getInstance(CIPHER_ALGO); + + gen.init(keySize); + + SecretKey key = gen.generateKey(); + + return new KeystoreEncryptionKey(key, makeDigest(key.getEncoded())); + } + catch (NoSuchAlgorithmException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void encrypt(ByteBuffer data, Serializable key, ByteBuffer res) { + doEncryption(data, aesWithPadding.get(), key, res); + } + + /** {@inheritDoc} */ + @Override public void encryptNoPadding(ByteBuffer data, Serializable key, ByteBuffer res) { + doEncryption(data, aesWithoutPadding.get(), key, res); + } + + /** {@inheritDoc} */ + @Override public byte[] decrypt(byte[] data, Serializable key) { + assert key instanceof KeystoreEncryptionKey; + + ensureStarted(); + + try { + SecretKeySpec keySpec = new SecretKeySpec(((KeystoreEncryptionKey)key).key().getEncoded(), CIPHER_ALGO); + + Cipher cipher = aesWithPadding.get(); + + cipher.init(DECRYPT_MODE, keySpec, new IvParameterSpec(data, 0, cipher.getBlockSize())); + + return cipher.doFinal(data, cipher.getBlockSize(), data.length - cipher.getBlockSize()); + } + catch (InvalidAlgorithmParameterException | InvalidKeyException | IllegalBlockSizeException | + BadPaddingException e) { + throw new IgniteSpiException(e); + } + } + + /** {@inheritDoc} */ + @Override public void decryptNoPadding(ByteBuffer data, Serializable key, ByteBuffer res) { + assert key instanceof KeystoreEncryptionKey; + + ensureStarted(); + + try { + SecretKeySpec keySpec = new SecretKeySpec(((KeystoreEncryptionKey)key).key().getEncoded(), CIPHER_ALGO); + + Cipher cipher = aesWithoutPadding.get(); + + byte[] iv = new byte[cipher.getBlockSize()]; + + data.get(iv); + + cipher.init(DECRYPT_MODE, keySpec, new IvParameterSpec(iv)); + + cipher.doFinal(data, res); + } + catch (InvalidAlgorithmParameterException | InvalidKeyException | IllegalBlockSizeException | + ShortBufferException | BadPaddingException e) { + throw new IgniteSpiException(e); + } + } + + /** + * @param data Plain data. + * @param cipher Cipher. + * @param key Encryption key. + */ + private void doEncryption(ByteBuffer data, Cipher cipher, Serializable key, ByteBuffer res) { + assert key instanceof KeystoreEncryptionKey; + + ensureStarted(); + + try { + SecretKeySpec keySpec = new SecretKeySpec(((KeystoreEncryptionKey)key).key().getEncoded(), CIPHER_ALGO); + + byte[] iv = initVector(cipher); + + res.put(iv); + + cipher.init(ENCRYPT_MODE, keySpec, new IvParameterSpec(iv)); + + cipher.doFinal(data, res); + } + catch (ShortBufferException | InvalidAlgorithmParameterException | InvalidKeyException | + IllegalBlockSizeException | BadPaddingException e) { + throw new IgniteSpiException(e); + } + } + + /** {@inheritDoc} */ + @Override public byte[] encryptKey(Serializable key) { + assert key instanceof KeystoreEncryptionKey; + + byte[] serKey = U.toBytes(key); + + byte[] res = new byte[encryptedSize(serKey.length)]; + + encrypt(ByteBuffer.wrap(serKey), masterKey, ByteBuffer.wrap(res)); + + return res; + } + + /** {@inheritDoc} */ + @Override public KeystoreEncryptionKey decryptKey(byte[] data) { + byte[] serKey = decrypt(data, masterKey); + + KeystoreEncryptionKey key = U.fromBytes(serKey); + + byte[] digest = makeDigest(key.key().getEncoded()); + + if (!Arrays.equals(key.digest, digest)) + throw new IgniteException("Key is broken!"); + + return key; + + } + + /** {@inheritDoc} */ + @Override public int encryptedSize(int dataSize) { + return encryptedSize(dataSize, AES_WITH_PADDING); + } + + /** {@inheritDoc} */ + @Override public int encryptedSizeNoPadding(int dataSize) { + return encryptedSize(dataSize, AES_WITHOUT_PADDING); + } + + /** {@inheritDoc} */ + @Override public int blockSize() { + return BLOCK_SZ; + } + + /** + * @param dataSize Data size. + * @param algo Encryption algorithm + * @return Encrypted data size. + */ + private int encryptedSize(int dataSize, String algo) { + int cntBlocks; + + switch (algo) { + case AES_WITH_PADDING: + cntBlocks = 2; + break; + + case AES_WITHOUT_PADDING: + cntBlocks = 1; + break; + + default: + throw new IllegalStateException("Unknown algorithm: " + algo); + } + + return (dataSize/BLOCK_SZ + cntBlocks)*BLOCK_SZ; + } + + /** + * Calculates message digest. + * + * @param msg Message. + * @return Digest. + */ + private byte[] makeDigest(byte[] msg) { + try { + MessageDigest md = MessageDigest.getInstance(DIGEST_ALGO); + + return md.digest(msg); + } + catch (NoSuchAlgorithmException e) { + throw new IgniteException(e); + } + } + + /** + * @param cipher Cipher. + * @return Init vector for encryption. + * @see <a href="https://en.wikipedia.org/wiki/Initialization_vector">Initialization vector</a> + */ + private byte[] initVector(Cipher cipher) { + byte[] iv = new byte[cipher.getBlockSize()]; + + ThreadLocalRandom.current().nextBytes(iv); + + return iv; + } + + /** + * {@code keyStorePath} could be absolute path or path to classpath resource. + * + * @return File for {@code keyStorePath}. + */ + private InputStream keyStoreFile() throws IOException { + File abs = new File(keyStorePath); + + if (abs.exists()) + return new FileInputStream(abs); + + URL clsPthRes = KeystoreEncryptionSpi.class.getClassLoader().getResource(keyStorePath); + + if (clsPthRes != null) + return clsPthRes.openStream(); + + return null; + } + + /** + * Ensures spi started. + * + * @throws IgniteException If spi not started. + */ + private void ensureStarted() throws IgniteException { + if (started()) + return; + + throw new IgniteException("EncryptionSpi is not started!"); + } + + /** + * Gets path to jdk keyStore that stores master key. + * + * @return Key store path. + */ + public String getKeyStorePath() { + return keyStorePath; + } + + /** + * Sets path to jdk keyStore that stores master key. + * + * @param keyStorePath Path to JDK KeyStore. + */ + public void setKeyStorePath(String keyStorePath) { + assert !F.isEmpty(keyStorePath) : "KeyStore path shouldn't be empty"; + assert !started() : "Spi already started"; + + this.keyStorePath = keyStorePath; + } + + /** + * Gets key store password. + * + * @return Key store password. + */ + public char[] getKeyStorePwd() { + return keyStorePwd; + } + + /** + * Sets password to access KeyStore. + * + * @param keyStorePassword Password for Key Store. + */ + public void setKeyStorePassword(char[] keyStorePassword) { + assert keyStorePassword != null && keyStorePassword.length > 0; + assert !started() : "Spi already started"; + + this.keyStorePwd = keyStorePassword; + } + + /** + * Gets encryption key size. + * + * @return Encryption key size. + */ + public int getKeySize() { + return keySize; + } + + /** + * Sets encryption key size. + * + * @param keySize Key size. + */ + public void setKeySize(int keySize) { + assert !started() : "Spi already started"; + + this.keySize = keySize; + } + + /** + * Gets master key name. + * + * @return Master key name. + */ + public String getMasterKeyName() { + return masterKeyName; + } + + /** + * Sets mater key name. + * + * @param masterKeyName Master key name. + */ + public void setMasterKeyName(String masterKeyName) { + assert !started() : "Spi already started"; + + this.masterKeyName = masterKeyName; + } +}
