http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java new file mode 100644 index 0000000..df932e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridKernalGateway; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; +import org.apache.ignite.internal.managers.collision.GridCollisionManager; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; +import org.apache.ignite.internal.managers.failover.GridFailoverManager; +import org.apache.ignite.internal.managers.indexing.GridIndexingManager; +import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; +import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.processors.closure.GridClosureProcessor; +import org.apache.ignite.internal.processors.cluster.ClusterProcessor; +import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; +import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; +import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; +import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; +import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; +import org.apache.ignite.internal.processors.igfs.IgfsHelper; +import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; +import org.apache.ignite.internal.processors.job.GridJobProcessor; +import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; +import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; +import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; +import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.internal.processors.pool.PoolProcessor; +import org.apache.ignite.internal.processors.port.GridPortProcessor; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.resource.GridResourceProcessor; +import org.apache.ignite.internal.processors.rest.GridRestProcessor; +import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter; +import org.apache.ignite.internal.processors.security.GridSecurityProcessor; +import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; +import org.apache.ignite.internal.processors.service.GridServiceProcessor; +import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; +import org.apache.ignite.internal.processors.task.GridTaskProcessor; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; +import org.apache.ignite.internal.util.IgniteExceptionRegistry; +import org.apache.ignite.internal.util.StripedExecutor; +import org.apache.ignite.plugin.PluginNotFoundException; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Dummy grid kernal context + */ +public class StandaloneGridKernalContext implements GridKernalContext { + private IgniteLogger log; + + /** + * @param log Logger. + */ + StandaloneGridKernalContext(IgniteLogger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Override public List<GridComponent> components() { + return null; + } + + /** {@inheritDoc} */ + @Override public UUID localNodeId() { + return null; + } + + /** {@inheritDoc} */ + @Override public String igniteInstanceName() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger log(String ctgr) { + return log; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger log(Class<?> cls) { + return log; + } + + /** {@inheritDoc} */ + @Override public boolean isStopping() { + return false; + } + + /** {@inheritDoc} */ + @Override public GridKernalGateway gateway() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteEx grid() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteConfiguration config() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridTaskProcessor task() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridAffinityProcessor affinity() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridJobProcessor job() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridTimeoutProcessor timeout() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridResourceProcessor resource() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridJobMetricsProcessor jobMetric() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCacheProcessor cache() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridClusterStateProcessor state() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridTaskSessionProcessor session() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridClosureProcessor closure() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridServiceProcessor service() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridPortProcessor ports() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteScheduleProcessorAdapter schedule() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridRestProcessor rest() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridSegmentationProcessor segmentation() { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> DataStreamProcessor<K, V> dataStream() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsProcessorAdapter igfs() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsHelper igfsHelper() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridContinuousProcessor continuous() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopProcessorAdapter hadoop() { + return null; + } + + /** {@inheritDoc} */ + @Override public PoolProcessor pools() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridMarshallerMappingProcessor mapping() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopHelper hadoopHelper() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService utilityCachePool() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteCacheObjectProcessor cacheObjects() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridQueryProcessor query() { + return null; + } + + /** {@inheritDoc} */ + @Override public SqlListenerProcessor sqlListener() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgnitePluginProcessor plugins() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridDeploymentManager deploy() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridIoManager io() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridDiscoveryManager discovery() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCheckpointManager checkpoint() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridEventStorageManager event() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridFailoverManager failover() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCollisionManager collision() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridSecurityProcessor security() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridLoadBalancerManager loadBalancing() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridIndexingManager indexing() { + return null; + } + + /** {@inheritDoc} */ + @Override public DataStructuresProcessor dataStructures() { + return null; + } + + /** {@inheritDoc} */ + @Override public void markSegmented() { } + + /** {@inheritDoc} */ + @Override public boolean segmented() { + return false; + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + return false; + } + + /** {@inheritDoc} */ + @Override public GridPerformanceSuggestions performance() { + return null; + } + + /** {@inheritDoc} */ + @Override public String userVersion(ClassLoader ldr) { + return null; + } + + /** {@inheritDoc} */ + @Override public PluginProvider pluginProvider(String name) throws PluginNotFoundException { + return null; + } + + /** {@inheritDoc} */ + @Override public <T> T createComponent(Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getServiceExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getSystemExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public StripedExecutor getStripedExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getManagementExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getPeerClassLoadingExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getIgfsExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getDataStreamerExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getRestExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getAffinityExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ExecutorService getIndexingExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getQueryExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, ? extends ExecutorService> customExecutors() { + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService getSchemaExecutorService() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteExceptionRegistry exceptionRegistry() { + return null; + } + + /** {@inheritDoc} */ + @Override public Object nodeAttribute(String key) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasNodeAttribute(String key) { + return false; + } + + /** {@inheritDoc} */ + @Override public Object addNodeAttribute(String key, Object val) { + return null; + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> nodeAttributes() { + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterProcessor cluster() { + return null; + } + + /** {@inheritDoc} */ + @Override public MarshallerContextImpl marshallerContext() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean clientNode() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean clientDisconnected() { + return false; + } + + /** {@inheritDoc} */ + @Override public PlatformProcessor platform() { + return null; + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<GridComponent> iterator() { + return null; + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java new file mode 100644 index 0000000..85a8724 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; + +/** + * Fake implementation for publishing setter and for creation in standalone WAL reader tool + */ +class StandaloneIgniteCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager { + /** {@inheritDoc} */ + @Override public void setPageSize(int pageSize) { + super.setPageSize(pageSize); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java new file mode 100644 index 0000000..f17c112 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.io.DataInput; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; + +/** + * WAL reader iterator, for creation in standalone WAL reader tool + * Operates over one directory, does not provide start and end boundaries + */ +class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { + /** */ + private static final long serialVersionUID = 0L; + + /** Record buffer size */ + private static final int BUF_SIZE = 2 * 1024 * 1024; + + /** + * WAL files directory. Should already contain 'consistent ID' as subfolder. + * <code>null</code> value means file-by-file iteration mode + */ + @Nullable + private File walFilesDir; + + /** + * File descriptors remained to scan. + * <code>null</code> value means directory scan mode + */ + @Nullable + private List<FileWriteAheadLogManager.FileDescriptor> walFileDescriptors; + + /** + * True if this iterator used for work dir, false for archive. + * In work dir mode exceptions come from record reading are ignored (file may be not completed). + * Index of file is taken from file itself, not from file name + */ + private boolean workDir; + + /** + * Creates iterator in directory scan mode + * + * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder + * @param log Logger. + * @param sharedCtx Shared context. + */ + StandaloneWalRecordsIterator( + @NotNull final File walFilesDir, + @NotNull final IgniteLogger log, + @NotNull final GridCacheSharedContext sharedCtx) throws IgniteCheckedException { + super(log, + sharedCtx, + new RecordV1Serializer(sharedCtx), + BUF_SIZE); + init(walFilesDir, false, null); + advance(); + } + + /** + * Creates iterator in file-by-file iteration mode. Directory + * + * @param log Logger. + * @param sharedCtx Shared context. + * @param workDir Work directory is scanned, false - archive + * @param walFiles Wal files. + */ + StandaloneWalRecordsIterator( + @NotNull final IgniteLogger log, + @NotNull final GridCacheSharedContext sharedCtx, + final boolean workDir, + @NotNull final File... walFiles) throws IgniteCheckedException { + super(log, + sharedCtx, + new RecordV1Serializer(sharedCtx), + BUF_SIZE); + this.workDir = workDir; + init(null, workDir, walFiles); + advance(); + } + + /** + * For directory mode sets oldest file as initial segment, + * for file by file mode, converts all files to descriptors and gets oldest as initial. + * + * @param walFilesDir directory for directory scan mode + * @param workDir work directory, only for file-by-file mode + * @param walFiles files for file-by-file iteration mode + */ + private void init( + @Nullable final File walFilesDir, + final boolean workDir, + @Nullable final File[] walFiles) throws IgniteCheckedException { + if (walFilesDir != null) { + FileWriteAheadLogManager.FileDescriptor[] descs = loadFileDescriptors(walFilesDir); + curWalSegmIdx = !F.isEmpty(descs) ? descs[0].getIdx() : 0; + this.walFilesDir = walFilesDir; + this.workDir = false; + } + else { + this.workDir = workDir; + if (workDir) + walFileDescriptors = scanIndexesFromFileHeaders(walFiles); + else + walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles))); + curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0; + } + curWalSegmIdx--; + + if (log.isDebugEnabled()) + log.debug("Initialized WAL cursor [curWalSegmIdx=" + curWalSegmIdx + ']'); + } + + /** + * This methods checks all provided files to be correct WAL segment. + * Header record and its position is checked. WAL position is used to deremine real index. + * File index from file name is ignored. + * + * @param allFiles files to scan + * @return list of file descriptors with checked header records, file index is set + * @throws IgniteCheckedException if IO error occurs + */ + private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders( + @Nullable final File[] allFiles) throws IgniteCheckedException { + if (allFiles == null || allFiles.length == 0) + return Collections.emptyList(); + + final List<FileWriteAheadLogManager.FileDescriptor> resultingDescs = new ArrayList<>(); + + for (File file : allFiles) { + if (file.length() < HEADER_RECORD_SIZE) + continue; + + FileWALPointer ptr; + + try (RandomAccessFile rf = new RandomAccessFile(file, "r");) { + final FileChannel ch = rf.getChannel(); + final ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE); + + buf.order(ByteOrder.nativeOrder()); + + final DataInput in = new FileInput(ch, buf); + // Header record must be agnostic to the serializer version. + final int type = in.readUnsignedByte(); + + if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) + throw new SegmentEofException("Reached logical end of the segment", null); + ptr = RecordV1Serializer.readPosition(in); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to scan index from file [" + file + "]", e); + } + + resultingDescs.add(new FileWriteAheadLogManager.FileDescriptor(file, ptr.index())); + } + Collections.sort(resultingDescs); + return resultingDescs; + } + + /** {@inheritDoc} */ + @Override protected FileWriteAheadLogManager.ReadFileHandle advanceSegment( + @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException { + + if (curWalSegment != null) + curWalSegment.close(); + + curWalSegmIdx++; + // curHandle.workDir is false + final FileWriteAheadLogManager.FileDescriptor fd; + + if (walFilesDir != null) { + fd = new FileWriteAheadLogManager.FileDescriptor( + new File(walFilesDir, + FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx))); + } + else { + if (walFileDescriptors.isEmpty()) + return null; //no files to read, stop iteration + + fd = walFileDescriptors.remove(0); + } + + if (log.isDebugEnabled()) + log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.getAbsolutePath() + ']'); + + assert fd != null; + + curRec = null; + try { + return initReadHandle(fd, null); + } + catch (FileNotFoundException e) { + log.info("Missing WAL segment in the archive: " + e.getMessage()); + return null; + } + } + + /** {@inheritDoc} */ + @Override protected void handleRecordException( + @NotNull final Exception e, + @Nullable final FileWALPointer ptr) { + super.handleRecordException(e, ptr); + final RuntimeException ex = new RuntimeException("Record reading problem occurred at file pointer [" + ptr + "]:" + e.getMessage(), e); + + ex.printStackTrace(); + if (!workDir) + throw ex; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + super.onClose(); + curRec = null; + + closeCurrentWalSegment(); + + curWalSegmIdx = Integer.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index 0ccd3a0..0a7b3dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -103,21 +103,37 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; /** * Record V1 serializer. + * Stores records in following format: + * <ul> + * <li>Record type from {@link RecordType#ordinal()} incremented by 1</li> + * <li>WAL pointer to double check consistency</li> + * <li>Data</li> + * <li>CRC or zero padding</li> + * </ul> */ public class RecordV1Serializer implements RecordSerializer { - /** */ - public static final int HEADER_RECORD_SIZE = /*Type*/1 + /*Pointer */12 + /*Magic*/8 + /*Version*/4 + /*CRC*/4; + /** Length of Type */ + public static final int REC_TYPE_SIZE = 1; + + /** Length of WAL Pointer */ + public static final int FILE_WAL_POINTER_SIZE = 12; + + /** Length of CRC value */ + private static final int CRC_SIZE = 4; /** */ + public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + /*Magic*/8 + /*Version*/4 + CRC_SIZE; + + /** Cache shared context */ private GridCacheSharedContext cctx; - /** */ + /** Size of page used for PageMemory regions */ private int pageSize; - /** */ + /** Cache object processor to reading {@link DataEntry DataEntries} */ private IgniteCacheObjectProcessor co; - /** */ + /** Skip CRC calculation/check flag */ private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false); /** @@ -658,7 +674,7 @@ public class RecordV1Serializer implements RecordSerializer { assert res != null; - res.size((int)(in0.position() - startPos + 4)); // Account for CRC which will be read afterwards. + res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards. return res; } @@ -671,12 +687,16 @@ public class RecordV1Serializer implements RecordSerializer { } /** - * @param in In. + * Loads record from input, does not read CRC value + * + * @param in Input to read record from + * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file + * @throws SegmentEofException if end of WAL segment reached */ private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { int type = in.readUnsignedByte(); - if (type == 0) + if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) throw new SegmentEofException("Reached logical end of the segment", null); FileWALPointer ptr = readPosition(in); @@ -1212,7 +1232,7 @@ public class RecordV1Serializer implements RecordSerializer { /** {@inheritDoc} */ @SuppressWarnings("CastConflictsWithInstanceof") @Override public int size(WALRecord record) throws IgniteCheckedException { - int commonFields = /* Type */1 + /* Pointer */12 + /*CRC*/4; + int commonFields = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE; switch (record.type()) { case PAGE_RECORD: @@ -1371,7 +1391,7 @@ public class RecordV1Serializer implements RecordSerializer { return commonFields + /*cacheId*/ 4 + /*pageId*/ 8; case SWITCH_SEGMENT_RECORD: - return commonFields; + return commonFields - CRC_SIZE; //CRC is not loaded for switch segment, exception is thrown instead default: throw new UnsupportedOperationException("Type: " + record.type()); @@ -1379,10 +1399,11 @@ public class RecordV1Serializer implements RecordSerializer { } /** + * Saves position, WAL pointer (requires {@link #FILE_WAL_POINTER_SIZE} bytes) * @param buf Byte buffer to serialize version to. * @param ptr File WAL pointer to write. */ - private void putPosition(ByteBuffer buf, FileWALPointer ptr) { + public static void putPosition(ByteBuffer buf, FileWALPointer ptr) { buf.putLong(ptr.index()); buf.putInt(ptr.fileOffset()); } @@ -1392,7 +1413,7 @@ public class RecordV1Serializer implements RecordSerializer { * @return Read file WAL pointer. * @throws IOException If failed to write. */ - private FileWALPointer readPosition(DataInput in) throws IOException { + public static FileWALPointer readPosition(DataInput in) throws IOException { long idx = in.readLong(); int fileOffset = in.readInt(); http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java index 5561d95..fed8766 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java @@ -181,6 +181,8 @@ public class IgnitePersistentStoreDataStructuresTest extends GridCommonAbstractT for (int i = 0; i < 100; i++) set.add(i); + assertEquals(100, set.size()); + stopAllGrids(); ignite = startGrids(4); http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index 793806e..48d8c21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -297,7 +297,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { final int entryCnt = 10_000; final int initGridCnt = 4; - final IgniteEx ig0 = (IgniteEx)startGrids(initGridCnt); + final Ignite ig0 = startGrids(initGridCnt); ig0.active(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java new file mode 100644 index 0000000..06bcf08 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader; + +import java.io.File; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.events.WalSegmentArchivedEvent; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; + +/** + * Test suite for WAL segments reader and event generator. + */ +public class IgniteWalReaderTest extends GridCommonAbstractTest { + /** Wal segments count */ + private static final int WAL_SEGMENTS = 10; + + /** Cache name. */ + private static final String CACHE_NAME = "cache0"; + + /** Fill wal with some data before iterating. Should be true for non local run */ + private static final boolean fillWalBeforeTest = true; + + /** Delete DB dir before test. */ + private static final boolean deleteBefore = true; + + /** Delete DB dir after test. */ + private static final boolean deleteAfter = true; + + /** Dump records to logger. Should be false for non local run */ + private static final boolean dumpRecords = false; + + /** Page size to set */ + public static final int PAGE_SIZE = 4 * 1024; + + /** + * Field for transferring setting from test to getConfig method + * Archive incomplete segment after inactivity milliseconds. + */ + private int archiveIncompleteSegmentAfterInactivityMs = 0; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + final CacheConfiguration<Integer, IgniteWalReaderTest.IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME); + + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + ccfg.setIndexedTypes(Integer.class, IgniteWalReaderTest.IndexedObject.class); + + cfg.setCacheConfiguration(ccfg); + + cfg.setIncludeEventTypes(EventType.EVT_WAL_SEGMENT_ARCHIVED); + + final MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setPageSize(PAGE_SIZE); + + final MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration(); + + memPlcCfg.setName("dfltMemPlc"); + memPlcCfg.setInitialSize(1024 * 1024 * 1024); + memPlcCfg.setMaxSize(1024 * 1024 * 1024); + + dbCfg.setMemoryPolicies(memPlcCfg); + dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + cfg.setMemoryConfiguration(dbCfg); + + final PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration(); + pCfg.setWalHistorySize(1); + pCfg.setWalSegmentSize(1024 * 1024); + pCfg.setWalSegments(WAL_SEGMENTS); + pCfg.setWalMode(WALMode.BACKGROUND); + + if (archiveIncompleteSegmentAfterInactivityMs > 0) + pCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs); + + cfg.setPersistentStoreConfiguration(pCfg); + + final BinaryConfiguration binCfg = new BinaryConfiguration(); + + binCfg.setCompactFooter(false); + + cfg.setBinaryConfiguration(binCfg); + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + stopAllGrids(); + + if (deleteBefore) + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + if (deleteAfter) + deleteWorkFiles(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + if (fillWalBeforeTest) + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** + * @throws Exception if failed. + */ + public void testFillWalAndReadRecords() throws Exception { + final int cacheObjectsToWrite = 10000; + + if (fillWalBeforeTest) { + final Ignite ignite0 = startGrid("node0"); + + ignite0.active(true); + + putDummyRecords(ignite0, cacheObjectsToWrite); + + stopGrid("node0"); + } + + final File db = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); + final File wal = new File(db, "wal"); + final File walArchive = new File(wal, "archive"); + final String consistentId = "127_0_0_1_47500"; + final MockWalIteratorFactory mockItFactory = new MockWalIteratorFactory(log, PAGE_SIZE, consistentId, WAL_SEGMENTS); + final WALIterator it = mockItFactory.iterator(wal, walArchive); + final int cntUsingMockIter = iterateAndCount(it); + + log.info("Total records loaded " + cntUsingMockIter); + assert cntUsingMockIter > 0; + assert cntUsingMockIter > cacheObjectsToWrite; + + final File walArchiveDirWithConsistentId = new File(walArchive, consistentId); + final File walWorkDirWithConsistentId = new File(wal, consistentId); + + final IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log, PAGE_SIZE); + final int cntArchiveDir = iterateAndCount(factory.iteratorArchiveDirectory(walArchiveDirWithConsistentId)); + + log.info("Total records loaded using directory : " + cntArchiveDir); + + final int cntArchiveFileByFile = iterateAndCount( + factory.iteratorArchiveFiles( + walArchiveDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER))); + + log.info("Total records loaded using archive directory (file-by-file): " + cntArchiveFileByFile); + + assert cntArchiveFileByFile > cacheObjectsToWrite; + assert cntArchiveDir > cacheObjectsToWrite; + assert cntArchiveDir == cntArchiveFileByFile; + //really count2 may be less because work dir correct loading is not supported yet + assert cntUsingMockIter >= cntArchiveDir + : "Mock based reader loaded " + cntUsingMockIter + " records but standalone has loaded only " + cntArchiveDir; + + + final File[] workFiles = walWorkDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER); + int cntWork = 0; + + try (WALIterator stIt = factory.iteratorWorkFiles(workFiles)) { + while (stIt.hasNextX()) { + IgniteBiTuple<WALPointer, WALRecord> next = stIt.nextX(); + if (dumpRecords) + log.info("Work. Record: " + next.get2()); + cntWork++; + } + } + log.info("Total records loaded from work: " + cntWork); + + assert cntWork + cntArchiveFileByFile == cntUsingMockIter + : "Work iterator loaded [" + cntWork + "] " + + "Archive iterator loaded [" + cntArchiveFileByFile + "]; " + + "mock iterator [" + cntUsingMockIter + "]"; + + } + + /** + * @param walIter iterator to count, will be closed + * @return count of records + * @throws IgniteCheckedException if failed to iterate + */ + private int iterateAndCount(WALIterator walIter) throws IgniteCheckedException { + int cntUsingMockIter = 0; + + try(WALIterator it = walIter) { + while (it.hasNextX()) { + IgniteBiTuple<WALPointer, WALRecord> next = it.nextX(); + if (dumpRecords) + log.info("Record: " + next.get2()); + cntUsingMockIter++; + } + } + return cntUsingMockIter; + } + + /** + * Tests archive completed event is fired + * + * @throws Exception if failed + */ + public void testArchiveCompletedEventFired() throws Exception { + final AtomicBoolean evtRecorded = new AtomicBoolean(); + + final Ignite ignite = startGrid("node0"); + + ignite.active(true); + + final IgniteEvents evts = ignite.events(); + + if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED)) + return; //nothing to test + + evts.localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event e) { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; + long idx = archComplEvt.getAbsWalSegmentIdx(); + log.info("Finished archive for segment [" + idx + ", " + + archComplEvt.getArchiveFile() + "]: [" + e + "]"); + + evtRecorded.set(true); + return true; + } + }, EVT_WAL_SEGMENT_ARCHIVED); + + putDummyRecords(ignite, 150); + + stopGrid("node0"); + assert evtRecorded.get(); + } + + /** + * Puts provided number of records to fill WAL + * + * @param ignite ignite instance + * @param recordsToWrite count + */ + private void putDummyRecords(Ignite ignite, int recordsToWrite) { + IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + + for (int i = 0; i < recordsToWrite; i++) + cache0.put(i, new IndexedObject(i)); + } + + /** + * Tests time out based WAL segment archiving + * + * @throws Exception if failure occurs + */ + public void testArchiveIncompleteSegmentAfterInactivity() throws Exception { + final AtomicBoolean waitingForEvt = new AtomicBoolean(); + final CountDownLatch archiveSegmentForInactivity = new CountDownLatch(1); + + archiveIncompleteSegmentAfterInactivityMs = 1000; + + final Ignite ignite = startGrid("node0"); + + ignite.active(true); + + final IgniteEvents evts = ignite.events(); + + evts.localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event e) { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; + long idx = archComplEvt.getAbsWalSegmentIdx(); + log.info("Finished archive for segment [" + idx + ", " + + archComplEvt.getArchiveFile() + "]: [" + e + "]"); + + if (waitingForEvt.get()) + archiveSegmentForInactivity.countDown(); + return true; + } + }, EVT_WAL_SEGMENT_ARCHIVED); + + putDummyRecords(ignite, 100); + waitingForEvt.set(true); //flag for skipping regular log() and rollOver() + + log.info("Wait for archiving segment for inactive grid started"); + + boolean recordedAfterSleep = + archiveSegmentForInactivity.await(archiveIncompleteSegmentAfterInactivityMs + 1001, TimeUnit.MILLISECONDS); + + stopGrid("node0"); + assert recordedAfterSleep; + } + + /** Test object for placing into grid in this test */ + private static class IndexedObject { + /** */ + @QuerySqlField(index = true) + private int iVal; + + /** Data filled with recognizable pattern */ + private byte[] data; + + /** + * @param iVal Integer value. + */ + private IndexedObject(int iVal) { + this.iVal = iVal; + int sz = 40000; + data = new byte[sz]; + for (int i = 0; i < sz; i++) + data[i] = (byte)('A' + (i % 10)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + IndexedObject obj = (IndexedObject)o; + + if (iVal != obj.iVal) + return false; + return Arrays.equals(data, obj.data); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = iVal; + res = 31 * res + Arrays.hashCode(data); + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteWalReaderTest.IndexedObject.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java new file mode 100644 index 0000000..95079a0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader; + +import java.io.File; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.jetbrains.annotations.Nullable; +import org.mockito.Mockito; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Mockito based WAL iterator provider + */ +public class MockWalIteratorFactory { + /** Logger. */ + private final IgniteLogger log; + + /** Page size. */ + private final int pageSize; + + /** Consistent node id. */ + private final String consistentId; + + /** Segments count in work dir. */ + private int segments; + + /** + * Creates factory + * @param log Logger. + * @param pageSize Page size. + * @param consistentId Consistent id. + * @param segments Segments. + */ + public MockWalIteratorFactory(@Nullable IgniteLogger log, int pageSize, String consistentId, int segments) { + this.log = log == null ? Mockito.mock(IgniteLogger.class) : log; + this.pageSize = pageSize; + this.consistentId = consistentId; + this.segments = segments; + } + + /** + * Creates iterator + * @param wal WAL directory without node id + * @param walArchive WAL archive without node id + * @return iterator + * @throws IgniteCheckedException if IO failed + */ + public WALIterator iterator(File wal, File walArchive) throws IgniteCheckedException { + final PersistentStoreConfiguration persistentCfg1 = Mockito.mock(PersistentStoreConfiguration.class); + + when(persistentCfg1.getWalStorePath()).thenReturn(wal.getAbsolutePath()); + when(persistentCfg1.getWalArchivePath()).thenReturn(walArchive.getAbsolutePath()); + when(persistentCfg1.getWalSegments()).thenReturn(segments); + when(persistentCfg1.getTlbSize()).thenReturn(PersistentStoreConfiguration.DFLT_TLB_SIZE); + when(persistentCfg1.getWalRecordIteratorBufferSize()).thenReturn(PersistentStoreConfiguration.DFLT_WAL_RECORD_ITERATOR_BUFFER_SIZE); + + final IgniteConfiguration cfg = Mockito.mock(IgniteConfiguration.class); + + when(cfg.getPersistentStoreConfiguration()).thenReturn(persistentCfg1); + + final GridKernalContext ctx = Mockito.mock(GridKernalContext.class); + + when(ctx.config()).thenReturn(cfg); + when(ctx.clientNode()).thenReturn(false); + + final GridDiscoveryManager disco = Mockito.mock(GridDiscoveryManager.class); + + when(disco.consistentId()).thenReturn(consistentId); + when(ctx.discovery()).thenReturn(disco); + + final IgniteWriteAheadLogManager mgr = new FileWriteAheadLogManager(ctx); + final GridCacheSharedContext sctx = Mockito.mock(GridCacheSharedContext.class); + + when(sctx.kernalContext()).thenReturn(ctx); + when(sctx.discovery()).thenReturn(disco); + + final GridCacheDatabaseSharedManager database = Mockito.mock(GridCacheDatabaseSharedManager.class); + + when(database.pageSize()).thenReturn(pageSize); + when(sctx.database()).thenReturn(database); + when(sctx.logger(any(Class.class))).thenReturn(log); + + mgr.start(sctx); + + return mgr.replay(null); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 351f52e..8018705 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -18,22 +18,18 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; -import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest; -import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistenceMetricsSelfTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest; -import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest; -import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest; -import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest; -import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest; /** * @@ -69,6 +65,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgnitePersistentStoreDataStructuresTest.class); + suite.addTestSuite(IgniteWalReaderTest.class); return suite; } }
