http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index a4d9e95..b36c2db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExp import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.typedef.F; @@ -108,7 +109,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { ) throws IgniteCheckedException { super(log, sharedCtx, - FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION), + new RecordSerializerFactoryImpl(sharedCtx), ioFactory, BUF_SIZE); this.keepBinary = keepBinary; @@ -136,7 +137,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { @NotNull File... walFiles) throws IgniteCheckedException { super(log, sharedCtx, - FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION), + new RecordSerializerFactoryImpl(sharedCtx), ioFactory, BUF_SIZE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java index 4fa6232..35c94a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java @@ -24,8 +24,11 @@ import org.apache.ignite.internal.util.typedef.internal.S; * Header record. */ public class HeaderRecord extends WALRecord { - /** */ - public static final long MAGIC = 0xB0D045A_CE7ED045AL; + /** Magic of regular WAL segment. */ + public static final long REGULAR_MAGIC = 0xB0D045A_CE7ED045AL; + + /** Magic of WAL segment with skipped physical records. */ + public static final long COMPACTED_MAGIC = 0x4E07AE0_E573A694EL; /** Serializer version */ private final int ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java new file mode 100644 index 0000000..9654748 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.record; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; + +/** + * Utility class for handling WAL record types. + */ +public final class RecordTypes { + /** */ + public static final Set<WALRecord.RecordType> DELTA_TYPE_SET = new HashSet<>(); + + static { + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGE_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.INIT_NEW_PAGE_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_FRAGMENT_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_REMOVE_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_ADD_ROOT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_CUT_ROOT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_INIT_NEW_ROOT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_RECYCLE); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_INSERT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_LEFTMOST_CHILD); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_COUNT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_REPLACE); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_REMOVE); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_INNER_REPLACE); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_REMOVE_ID); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FORWARD_PAGE_SPLIT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_EXISTING_PAGE_SPLIT); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_MERGE); + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_SET_NEXT); + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_SET_PREVIOUS); + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_INIT_NEW_PAGE); + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_ADD_PAGE); + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_REMOVE_PAGE); + DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_INIT); + DELTA_TYPE_SET.add(WALRecord.RecordType.PARTITION_META_PAGE_UPDATE_COUNTERS); + DELTA_TYPE_SET.add(WALRecord.RecordType.TRACKING_PAGE_DELTA); + DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID); + DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID); + DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_NEXT_SNAPSHOT_ID); + DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_ALLOCATED_INDEX); + DELTA_TYPE_SET.add(WALRecord.RecordType.PAGE_LIST_META_RESET_COUNT_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_UPDATE_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT2); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java new file mode 100644 index 0000000..88c5f42 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; + +/** + * Interface to provide size, read and write operations with WAL records + * <b>without any headers and meta information</b>. + */ +public interface RecordDataSerializer { + /** + * Calculates size of record data. + * + * @param record WAL record. + * @return Size of record in bytes. + * @throws IgniteCheckedException If it's unable to calculate record data size. + */ + int size(WALRecord record) throws IgniteCheckedException; + + /** + * Reads record data of {@code type} from buffer {@code in}. + * + * @param type Record type. + * @param in Buffer to read. + * @return WAL record. + * @throws IOException In case of I/O problems. + * @throws IgniteCheckedException If it's unable to read record. + */ + WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException; + + /** + * Writes record data to buffer {@code buf}. + * + * @param record WAL record. + * @param buf Buffer to write. + * @throws IgniteCheckedException If it's unable to write record. + */ + void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index e583df3..d478917 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -87,7 +87,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; -import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; @@ -283,8 +282,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { return /*cacheId*/ 4 + /*pageId*/ 8; case SWITCH_SEGMENT_RECORD: - // CRC is not loaded for switch segment. - return -CRC_SIZE; + return 0; case TX_RECORD: return txRecordSerializer.sizeOfTxRecord((TxRecord)record); @@ -391,9 +389,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer { case HEADER_RECORD: long magic = in.readLong(); - if (magic != HeaderRecord.MAGIC) - throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + - ", actual=" + U.hexLong(magic) + ']'); + if (magic != HeaderRecord.REGULAR_MAGIC && magic != HeaderRecord.COMPACTED_MAGIC) + throw new EOFException("Magic is corrupted [actual=" + U.hexLong(magic) + ']'); int ver = in.readInt(); @@ -912,7 +909,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { break; case HEADER_RECORD: - buf.putLong(HeaderRecord.MAGIC); + buf.putLong(HeaderRecord.REGULAR_MAGIC); buf.putInt(((HeaderRecord)record).version()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java index c02f36e..16a81a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -24,9 +24,9 @@ import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; -import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; /** @@ -54,6 +54,9 @@ public class RecordDataV2Serializer implements RecordDataSerializer { case DATA_RECORD: return delegateSerializer.size(record) + 8/*timestamp*/; + case SNAPSHOT: + return 8 + 1; + default: return delegateSerializer.size(record); } @@ -76,6 +79,12 @@ public class RecordDataV2Serializer implements RecordDataSerializer { return new DataRecord(entries, timeStamp); + case SNAPSHOT: + long snpId = in.readLong(); + byte full = in.readByte(); + + return new SnapshotRecord(snpId, full == 1); + default: return delegateSerializer.readRecord(type, in); } @@ -98,6 +107,14 @@ public class RecordDataV2Serializer implements RecordDataSerializer { break; + case SNAPSHOT: + SnapshotRecord snpRec = (SnapshotRecord)record; + + buf.putLong(snpRec.getSnapshotId()); + buf.put(snpRec.isFull() ? (byte)1 : 0); + + break; + default: delegateSerializer.writeRecord(record, buf); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java new file mode 100644 index 0000000..c5760ab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; + +/** + * Record serializer. + */ +public interface RecordSerializer { + /** + * @return serializer version + */ + public int version(); + + /** + * Calculates record size in byte including expected wal pointer, CRC and type field + * + * @param record Record. + * @return Size in bytes. + */ + public int size(WALRecord record) throws IgniteCheckedException; + + /** + * @param record Entry to write. + * @param buf Buffer. + */ + public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; + + /** + * Loads record from input + * + * @param in Data input to read data from. + * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file + * @return Read entry. + */ + public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; + + /** + * Flag to write (or not) wal pointer to record + */ + public boolean writePointer(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java new file mode 100644 index 0000000..f46c315 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord; +import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.lang.IgniteBiPredicate; + +/** + * Factory for creating {@link RecordSerializer}. + */ +public interface RecordSerializerFactory { + /** + * Factory method for creation {@link RecordSerializer}. + * + * @param ver Serializer version. + * @return record serializer. + */ + public RecordSerializer createSerializer(int ver) throws IgniteCheckedException; + + /** + * TODO: This flag was added under IGNITE-6029, but still unused. Should be either handled or removed. + * + * @param writePointer Write pointer flag. + */ + public RecordSerializerFactory writePointer(boolean writePointer); + + /** + * Specifies deserialization filter. Created serializer will read bulk {@link FilteredRecord} instead of actual + * record if record type/pointer doesn't satisfy filter. + * + * @param readTypeFilter Read type filter. + */ + public RecordSerializerFactory recordDeserializeFilter(IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter); + + /** + * If marshalledMode is on, created serializer will read {@link MarshalledRecord} with raw binary data instead of + * actual record. + * Useful for copying binary data from WAL. + * + * @param marshalledMode Marshalled mode. + */ + public RecordSerializerFactory marshalledMode(boolean marshalledMode); + + /** + * If skipPositionCheck is true, created serializer won't check that actual position of record in file is equal to + * position in saved record's WALPointer. + * Must be true if we are reading from compacted WAL segment. + * + * @param skipPositionCheck Skip position check. + */ + public RecordSerializerFactory skipPositionCheck(boolean skipPositionCheck); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java new file mode 100644 index 0000000..468392a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java @@ -0,0 +1,133 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.lang.IgniteBiPredicate; + +/** + * + */ +public class RecordSerializerFactoryImpl implements RecordSerializerFactory { + /** Context. */ + private GridCacheSharedContext cctx; + + /** Write pointer. */ + private boolean writePointer; + + /** Read record filter. */ + private IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter; + + /** + * Marshalled mode flag. + * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead. + */ + private boolean marshalledMode; + + /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */ + private boolean skipPositionCheck; + + /** + * @param cctx Cctx. + */ + public RecordSerializerFactoryImpl(GridCacheSharedContext cctx) { + this.cctx = cctx; + } + + /** {@inheritDoc} */ + @Override public RecordSerializer createSerializer(int ver) throws IgniteCheckedException { + if (ver <= 0) + throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file)."); + + switch (ver) { + case 1: + return new RecordV1Serializer(new RecordDataV1Serializer(cctx), + writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter); + + case 2: + RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx)); + + return new RecordV2Serializer(dataV2Serializer, + writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter); + + default: + throw new IgniteCheckedException("Failed to create a serializer with the given version " + + "(forward compatibility is not supported): " + ver); + } + } + + /** + * @return Write pointer. + */ + public boolean writePointer() { + return writePointer; + } + + /** {@inheritDoc} */ + @Override public RecordSerializerFactoryImpl writePointer(boolean writePointer) { + this.writePointer = writePointer; + + return this; + } + + /** + * @return Read type filter. + */ + public IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter() { + return recordDeserializeFilter; + } + + /** {@inheritDoc} */ + @Override public RecordSerializerFactoryImpl recordDeserializeFilter( + IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter) { + this.recordDeserializeFilter = readTypeFilter; + + return this; + } + + /** + * @return Marshalled mode. Records are not deserialized in this mode, with binary representation are read instead. + */ + public boolean marshalledMode() { + return marshalledMode; + } + + /** {@inheritDoc} */ + @Override public RecordSerializerFactoryImpl marshalledMode(boolean marshalledMode) { + this.marshalledMode = marshalledMode; + + return this; + } + + /** + * @return Skip position check flag. Should be set for reading compacted wal file with skipped physical records. + */ + public boolean skipPositionCheck() { + return skipPositionCheck; + } + + /** {@inheritDoc} */ + @Override public RecordSerializerFactoryImpl skipPositionCheck(boolean skipPositionCheck) { + this.skipPositionCheck = skipPositionCheck; + + return this; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index c4e1bf2..d460705 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -21,22 +21,26 @@ import java.io.DataInput; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord; +import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; -import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; @@ -72,6 +76,32 @@ public class RecordV1Serializer implements RecordSerializer { /** Write pointer. */ private final boolean writePointer; + /** + * Record type filter. + * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter. + */ + private final IgniteBiPredicate<RecordType, WALPointer> recordFilter; + + /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */ + private final boolean skipPositionCheck; + + /** + * Marshalled mode. + * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead. + */ + private final boolean marshalledMode; + + /** Thread-local heap byte buffer. */ + private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + ByteBuffer buf = ByteBuffer.allocate(4096); + + buf.order(GridUnsafe.NATIVE_BYTE_ORDER); + + return buf; + } + }; + /** Record read/write functional interface. */ private final RecordIO recordIO = new RecordIO() { @@ -89,11 +119,36 @@ public class RecordV1Serializer implements RecordSerializer { FileWALPointer ptr = readPosition(in); - if (!F.eq(ptr, expPtr)) + if (!skipPositionCheck && !F.eq(ptr, expPtr)) throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr + ", readPtr=" + ptr + ']', null); - return dataSerializer.readRecord(recType, in); + final WALRecord rec = dataSerializer.readRecord(recType, in); + + if (recordFilter != null && !recordFilter.apply(rec.type(), ptr)) + return new FilteredRecord(); + else if (marshalledMode) { + ByteBuffer buf = heapTlb.get(); + + int recordSize = size(rec); + + if (buf.capacity() < recordSize) + heapTlb.set(buf = ByteBuffer.allocate(recordSize * 3 / 2).order(ByteOrder.nativeOrder())); + else + buf.clear(); + + rec.position(ptr); + + writeRecord(rec, buf); + + buf.flip(); + + assert buf.remaining() == recordSize; + + return new MarshalledRecord(rec.type(), rec.position(), buf); + } + else + return rec; } /** {@inheritDoc} */ @@ -111,13 +166,19 @@ public class RecordV1Serializer implements RecordSerializer { /** * Create an instance of V1 serializer. - * * @param dataSerializer V1 data serializer. * @param writePointer Write pointer. + * @param marshalledMode Marshalled mode. + * @param skipPositionCheck Skip position check mode. + * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record */ - public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer) { + public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer, + boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType, WALPointer> recordFilter) { this.dataSerializer = dataSerializer; this.writePointer = writePointer; + this.recordFilter = recordFilter; + this.skipPositionCheck = skipPositionCheck; + this.marshalledMode = marshalledMode; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index 5eb45ac..05f2a24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -18,21 +18,26 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; import java.io.DataInput; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord; +import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; -import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; -import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.*; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE; @@ -57,6 +62,32 @@ public class RecordV2Serializer implements RecordSerializer { /** Write pointer. */ private final boolean writePointer; + /** + * Marshalled mode. + * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead. + */ + private final boolean marshalledMode; + + /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */ + private final boolean skipPositionCheck; + + /** Thread-local heap byte buffer. */ + private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + ByteBuffer buf = ByteBuffer.allocate(4096); + + buf.order(GridUnsafe.NATIVE_BYTE_ORDER); + + return buf; + } + }; + + /** + * Record type filter. + * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter. + */ + private final IgniteBiPredicate<RecordType, WALPointer> recordFilter; + /** Record read/write functional interface. */ private final RecordIO recordIO = new RecordIO() { @@ -75,9 +106,47 @@ public class RecordV2Serializer implements RecordSerializer { if (recType == WALRecord.RecordType.SWITCH_SEGMENT_RECORD) throw new SegmentEofException("Reached end of segment", null); - FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr); + FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck); + + if (recordFilter != null && !recordFilter.apply(recType, ptr)) { + int toSkip = ptr.length() - REC_TYPE_SIZE - FILE_WAL_POINTER_SIZE - CRC_SIZE; + + assert toSkip >= 0 : "Too small saved record length: " + ptr; + + if (in.skipBytes(toSkip) < toSkip) + throw new EOFException("Reached end of file while reading record: " + ptr); + + return new FilteredRecord(); + } + else if (marshalledMode) { + ByteBuffer buf = heapTlb.get(); + + if (buf.capacity() < ptr.length()) + heapTlb.set(buf = ByteBuffer.allocate(ptr.length() * 3 / 2).order(ByteOrder.nativeOrder())); + else + buf.clear(); + + buf.put((byte)(recType.ordinal() + 1)); + + buf.putLong(ptr.index()); + buf.putInt(ptr.fileOffset()); + buf.putInt(ptr.length()); + + in.readFully(buf.array(), buf.position(), ptr.length() - buf.position()); + buf.position(ptr.length()); + + // Unwind reading CRC. + in.buffer().position(in.buffer().position() - CRC_SIZE); + + buf.flip(); + + assert buf.remaining() == ptr.length(); + + return new MarshalledRecord(recType, ptr, buf); + } + else + return dataSerializer.readRecord(recType, in); - return dataSerializer.readRecord(recType, in); } /** {@inheritDoc} */ @@ -98,12 +167,18 @@ public class RecordV2Serializer implements RecordSerializer { /** * Create an instance of Record V2 serializer. - * * @param dataSerializer V2 data serializer. + * @param marshalledMode Marshalled mode. + * @param skipPositionCheck Skip position check mode. + * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record */ - public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer) { + public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer, + boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType, WALPointer> recordFilter) { this.dataSerializer = dataSerializer; this.writePointer = writePointer; + this.marshalledMode = marshalledMode; + this.skipPositionCheck = skipPositionCheck; + this.recordFilter = recordFilter; } /** {@inheritDoc} */ @@ -133,12 +208,14 @@ public class RecordV2Serializer implements RecordSerializer { /** * @param in Data input to read pointer from. + * @param skipPositionCheck Flag for skipping position check. * @return Read file WAL pointer. * @throws IOException If failed to write. */ public static FileWALPointer readPositionAndCheckPoint( DataInput in, - WALPointer expPtr + WALPointer expPtr, + boolean skipPositionCheck ) throws IgniteCheckedException, IOException { long idx = in.readLong(); int fileOffset = in.readInt(); @@ -146,7 +223,7 @@ public class RecordV2Serializer implements RecordSerializer { FileWALPointer p = (FileWALPointer)expPtr; - if (!F.eq(idx, p.index()) || !F.eq(fileOffset, p.fileOffset())) + if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOffset, p.fileOffset()))) throw new WalSegmentTailReachedException( "WAL segment tail is reached. [ " + "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " + http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index a8df0c6..0a278d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -1680,6 +1680,7 @@ public class PlatformConfigurationUtils { .setMetricsRateTimeInterval(in.readLong()) .setCheckpointWriteOrder(CheckpointWriteOrder.fromOrdinal(in.readInt())) .setWriteThrottlingEnabled(in.readBoolean()) + .setWalCompactionEnabled(in.readBoolean()) .setSystemRegionInitialSize(in.readLong()) .setSystemRegionMaxSize(in.readLong()) .setPageSize(in.readInt()) @@ -1774,6 +1775,7 @@ public class PlatformConfigurationUtils { w.writeLong(cfg.getMetricsRateTimeInterval()); w.writeInt(cfg.getCheckpointWriteOrder().ordinal()); w.writeBoolean(cfg.isWriteThrottlingEnabled()); + w.writeBoolean(cfg.isWalCompactionEnabled()); w.writeLong(cfg.getSystemRegionInitialSize()); w.writeLong(cfg.getSystemRegionMaxSize()); w.writeInt(cfg.getPageSize()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java index af8f679..107b467 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.OpenOption; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; @@ -41,7 +42,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; -import java.nio.file.OpenOption; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.READ; @@ -120,16 +120,17 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest { */ private void flushingErrorTest() throws Exception { final IgniteEx grid = startGrid(0); - grid.active(true); - IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE); + try { + grid.active(true); - final int iterations = 100; + IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE); + + final int iterations = 100; - try { for (int i = 0; i < iterations; i++) { Transaction tx = grid.transactions().txStart( - TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED); + TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED); cache.put(i, "testValue" + i); @@ -144,8 +145,7 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest { // We should await successful stop of node. GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return grid.context().gateway().getState() == GridKernalState.STOPPED; } }, getTestTimeout()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index 35d85d1..c6d58e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -27,9 +27,9 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index b357877..00627b8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -113,7 +113,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { /** */ private int walSegmentSize; - /** Logger only. */ + /** Log only. */ private boolean logOnly; /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java new file mode 100644 index 0000000..d927676 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class IgniteWalRecoveryWithCompactionTest extends IgniteWalRecoveryTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getDataStorageConfiguration().setWalCompactionEnabled(true); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java index 7500fdc..60fb1d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -76,6 +76,8 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCheckDifferentSerializerVersions() throws Exception { + System.setProperty(IGNITE_WAL_SERIALIZER_VERSION, "1"); + IgniteEx ig0 = (IgniteEx)startGrid(); IgniteWriteAheadLogManager wal0 = ig0.context().cache().context().wal(); @@ -302,6 +304,11 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest { deleteWorkFiles(); } + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IGNITE_WAL_SERIALIZER_VERSION); + } + /** * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java new file mode 100644 index 0000000..6b9f06a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java @@ -0,0 +1,312 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.Arrays; +import java.util.Comparator; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; + +/** + * + */ +public class WalCompactionTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Wal segment size. */ + private static final int WAL_SEGMENT_SIZE = 4 * 1024 * 1024; + + /** Cache name. */ + public static final String CACHE_NAME = "cache"; + + /** Entries count. */ + public static final int ENTRIES = 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(200 * 1024 * 1024)) + .setWalMode(WALMode.LOG_ONLY) + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalHistorySize(500) + .setWalCompactionEnabled(true)); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("cache"); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + ccfg.setBackups(0); + + cfg.setCacheConfiguration(ccfg); + cfg.setConsistentId(name); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** + * @throws Exception If failed. + */ + public void testApplyingUpdatesFromCompactedWal() throws Exception { + IgniteEx ig = (IgniteEx)startGrids(3); + ig.active(true); + + IgniteCache<Integer, byte[]> cache = ig.cache("cache"); + + for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total. + final byte[] val = new byte[20000]; + + val[i] = 1; + + cache.put(i, val); + } + + // Spam WAL to move all data records to compressible WAL zone. + for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) + ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE])); + + // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head. + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + + Thread.sleep(15_000); // Allow compressor to archive WAL segments. + + String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName(); + + File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); + File walDir = new File(dbDir, "wal"); + File archiveDir = new File(walDir, "archive"); + File nodeArchiveDir = new File(archiveDir, nodeFolderName); + File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(0) + ".zip"); + + assertTrue(walSegment.exists()); + assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be compressed at least in half. + + stopAllGrids(); + + File nodeLfsDir = new File(dbDir, nodeFolderName); + File cpMarkersDir = new File(nodeLfsDir, "cp"); + + File[] cpMarkers = cpMarkersDir.listFiles(); + + assertNotNull(cpMarkers); + assertTrue(cpMarkers.length > 0); + + File cacheDir = new File(nodeLfsDir, "cache-" + CACHE_NAME); + File[] lfsFiles = cacheDir.listFiles(); + + assertNotNull(lfsFiles); + assertTrue(lfsFiles.length > 0); + + // Enforce reading WAL from the very beginning at the next start. + for (File f : cpMarkers) + f.delete(); + + for (File f : lfsFiles) + f.delete(); + + ig = (IgniteEx)startGrids(3); + ig.active(true); + + cache = ig.cache(CACHE_NAME); + + boolean fail = false; + + // Check that all data is recovered from compacted WAL. + for (int i = 0; i < ENTRIES; i++) { + byte[] arr = cache.get(i); + + if (arr == null) { + System.out.println(">>> Missing: " + i); + + fail = true; + } + else if (arr[i] != 1) { + System.out.println(">>> Corrupted: " + i); + + fail = true; + } + } + + assertFalse(fail); + } + + /** + * @throws Exception If failed. + */ + public void testSeekingStartInCompactedSegment() throws Exception { + IgniteEx ig = (IgniteEx)startGrids(3); + ig.active(true); + + IgniteCache<Integer, byte[]> cache = ig.cache("cache"); + + for (int i = 0; i < 100; i++) { + final byte[] val = new byte[20000]; + + val[i] = 1; + + cache.put(i, val); + } + + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + + String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName(); + + File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); + File nodeLfsDir = new File(dbDir, nodeFolderName); + File cpMarkersDir = new File(nodeLfsDir, "cp"); + + final File[] cpMarkersToSave = cpMarkersDir.listFiles(); + + assert cpMarkersToSave != null; + assertTrue(cpMarkersToSave.length >= 2); + + Arrays.sort(cpMarkersToSave, new Comparator<File>() { + @Override public int compare(File o1, File o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + + for (int i = 100; i < ENTRIES; i++) { // At least 20MB of raw data in total. + final byte[] val = new byte[20000]; + + val[i] = 1; + + cache.put(i, val); + } + + // Spam WAL to move all data records to compressible WAL zone. + for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) + ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE])); + + // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head. + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + + Thread.sleep(15_000); // Allow compressor to archive WAL segments. + + File walDir = new File(dbDir, "wal"); + File archiveDir = new File(walDir, "archive"); + File nodeArchiveDir = new File(archiveDir, nodeFolderName); + File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(0) + ".zip"); + + assertTrue(walSegment.exists()); + assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be compressed at least in half. + + stopAllGrids(); + + File[] cpMarkers = cpMarkersDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return !(name.equals(cpMarkersToSave[0].getName()) || name.equals(cpMarkersToSave[1].getName())); + } + }); + + assertNotNull(cpMarkers); + assertTrue(cpMarkers.length > 0); + + File cacheDir = new File(nodeLfsDir, "cache-" + CACHE_NAME); + File[] lfsFiles = cacheDir.listFiles(); + + assertNotNull(lfsFiles); + assertTrue(lfsFiles.length > 0); + + // Enforce reading WAL from the very beginning at the next start. + for (File f : cpMarkers) + f.delete(); + + for (File f : lfsFiles) + f.delete(); + + ig = (IgniteEx)startGrids(3); + ig.active(true); + + cache = ig.cache(CACHE_NAME); + + int missing = 0; + + for (int i = 0; i < 100; i++) { + if (!cache.containsKey(i)) + missing++; + } + + System.out.println(">>> Missing " + missing + " entries logged before WAL iteration start"); + assertTrue(missing > 0); + + boolean fail = false; + + // Check that all data is recovered from compacted WAL. + for (int i = 100; i < ENTRIES; i++) { + byte[] arr = cache.get(i); + + if (arr == null) { + System.out.println(">>> Missing: " + i); + + fail = true; + } + else if (arr[i] != 1) { + System.out.println(">>> Corrupted: " + i); + + fail = true; + } + } + + assertFalse(fail); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 72450b8..b95208c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -77,6 +77,11 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ + @Override public void allowCompressionUntil(WALPointer ptr) { + // No-op. + } + + /** {@inheritDoc} */ @Override public boolean reserved(WALPointer ptr) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index adfdb2c..17833ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlySelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests; import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest; @@ -93,6 +94,8 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalSerializerVersionTest.class); + suite.addTestSuite(WalCompactionTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 114f630..ea30ce2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryWithCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest; /** @@ -49,6 +50,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(WalRecoveryTxLogicalRecordsTest.class); suite.addTestSuite(IgniteWalRecoveryTest.class); + suite.addTestSuite(IgniteWalRecoveryWithCompactionTest.class); suite.addTestSuite(IgnitePdsNoActualWalHistoryTest.class); suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class); suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs index 09b3fe4..ae3c0ed 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs @@ -134,6 +134,11 @@ namespace Apache.Ignite.Core.Configuration public const bool DefaultWriteThrottlingEnabled = false; /// <summary> + /// Default value for <see cref="WalCompactionEnabled"/>. + /// </summary> + public const bool DefaultWalCompactionEnabled = false; + + /// <summary> /// Default size of a memory chunk reserved for system cache initially. /// </summary> public const long DefaultSystemRegionInitialSize = 40 * 1024 * 1024; @@ -174,6 +179,7 @@ namespace Apache.Ignite.Core.Configuration WalPath = DefaultWalPath; CheckpointWriteOrder = DefaultCheckpointWriteOrder; WriteThrottlingEnabled = DefaultWriteThrottlingEnabled; + WalCompactionEnabled = DefaultWalCompactionEnabled; SystemRegionInitialSize = DefaultSystemRegionInitialSize; SystemRegionMaxSize = DefaultSystemRegionMaxSize; PageSize = DefaultPageSize; @@ -207,6 +213,7 @@ namespace Apache.Ignite.Core.Configuration MetricsRateTimeInterval = reader.ReadLongAsTimespan(); CheckpointWriteOrder = (CheckpointWriteOrder)reader.ReadInt(); WriteThrottlingEnabled = reader.ReadBoolean(); + WalCompactionEnabled = reader.ReadBoolean(); SystemRegionInitialSize = reader.ReadLong(); SystemRegionMaxSize = reader.ReadLong(); @@ -256,6 +263,7 @@ namespace Apache.Ignite.Core.Configuration writer.WriteTimeSpanAsLong(MetricsRateTimeInterval); writer.WriteInt((int)CheckpointWriteOrder); writer.WriteBoolean(WriteThrottlingEnabled); + writer.WriteBoolean(WalCompactionEnabled); writer.WriteLong(SystemRegionInitialSize); writer.WriteLong(SystemRegionMaxSize); @@ -420,6 +428,14 @@ namespace Apache.Ignite.Core.Configuration public bool WriteThrottlingEnabled { get; set; } /// <summary> + /// Gets or sets flag indicating whether WAL compaction is enabled. + /// If true, system filters and compresses WAL archive in background. + /// Compressed WAL archive gets automatically decompressed on demand. + /// </summary> + [DefaultValue(DefaultWalCompactionEnabled)] + public bool WalCompactionEnabled { get; set; } + + /// <summary> /// Gets or sets the size of a memory chunk reserved for system needs. /// </summary> [DefaultValue(DefaultSystemRegionInitialSize)] http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index 98607b6..c6eb3c1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -1806,6 +1806,11 @@ <xs:documentation>Threads that generate dirty pages too fast during ongoing checkpoint will be throttled.</xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="walCompactionEnabled" type="xs:boolean"> + <xs:annotation> + <xs:documentation>If true, system will filter and compress WAL archive in background. Compressed WAL archive gets automatically decompressed on demand.</xs:documentation> + </xs:annotation> + </xs:attribute> <xs:attribute name="pageSize" type="xs:int"> <xs:annotation> <xs:documentation>Size of the memory page.</xs:documentation>
