This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 41adf7c [ASTERIXDB-2491][TXN] Recovery fixes 41adf7c is described below commit 41adf7c449c4b89370c0c400397924d492f44daf Author: Michael Blow <mb...@apache.org> AuthorDate: Wed Mar 6 21:27:47 2019 -0500 [ASTERIXDB-2491][TXN] Recovery fixes - user model changes: no - storage format changes: yes[1] - interface changes: no Details: - Change field offset type from integer16 to integer32 - Add recovery test - Add version to log entries [1] LogRecord format change with this patch; old LogRecord format can still be processed, however old instances cannot read new log format Change-Id: Iaf14b9a73a0239763bfeb0ce2d81cf952e6d72d3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3065 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> --- .../org/apache/asterix/test/txn/LogRecordTest.java | 108 +++++++++++++++ .../asterix/common/transactions/ILogRecord.java | 44 +----- .../asterix/common/transactions/LogConstants.java | 70 ++++++++++ .../asterix/common/transactions/LogRecord.java | 151 +++++++++++++-------- .../asterix/common/utils/TransactionUtil.java | 10 +- .../org/apache/asterix/test/server/RecoveryIT.java | 1 - .../large_object_100K.1.script.aql | 19 +++ .../large_object_100K/large_object_100K.2.ddl.aql | 36 +++++ .../large_object_100K.3.update.aql | 31 +++++ .../large_object_100K.4.txnqbc.aql | 29 ++++ .../large_object_100K.5.script.aql | 19 +++ .../large_object_100K.6.script.aql | 19 +++ .../large_object_100K.7.txnqar.aql | 29 ++++ .../large_object_100K.8.script.aql | 19 +++ .../large_object_100K/create_and_start.sh | 18 +++ .../large_object_100K/kill_cc_and_nc.sh | 19 +++ .../large_object_100K/stop_and_delete.sh | 20 +++ .../large_object_100K/stop_and_start.sh | 19 +++ .../src/test/resources/transactionts/testsuite.xml | 6 + .../management/service/logging/LogBuffer.java | 3 +- .../management/service/logging/LogManager.java | 18 +-- .../management/service/logging/LogReader.java | 34 ++--- .../management/service/recovery/TxnEntityId.java | 6 +- .../am/common/tuples/SimpleTupleReference.java | 14 +- ...eReference.java => SimpleTupleReferenceV0.java} | 55 +------- .../am/common/tuples/SimpleTupleWriter.java | 16 +-- 26 files changed, 613 insertions(+), 200 deletions(-) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogRecordTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogRecordTest.java new file mode 100644 index 0000000..b7924d7 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogRecordTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.txn; + +import static org.apache.asterix.common.transactions.LogConstants.LOG_SOURCE_MAX; +import static org.apache.asterix.common.transactions.LogConstants.LOG_SOURCE_MIN; +import static org.apache.asterix.common.transactions.LogConstants.VERSION_MAX; +import static org.apache.asterix.common.transactions.LogConstants.VERSION_MIN; + +import java.nio.ByteBuffer; +import java.util.stream.IntStream; + +import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.LogSource; +import org.apache.asterix.common.transactions.LogType; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +@FixMethodOrder(MethodSorters.JVM) +public class LogRecordTest { + private static ByteBuffer buffer; + + @BeforeClass + public static void setup() { + buffer = ByteBuffer.allocate(100); + } + + @Test + @SuppressWarnings("squid:S3415") + public void testVersionIdLogSourceRange() { + Assert.assertEquals("min version", 0, VERSION_MIN); + Assert.assertEquals("max version", 62, VERSION_MAX); + Assert.assertEquals("min source", 0, LOG_SOURCE_MIN); + Assert.assertEquals("max source", 3, LOG_SOURCE_MAX); + IntStream.rangeClosed(LOG_SOURCE_MIN, LOG_SOURCE_MAX).forEach( + s -> IntStream.rangeClosed(VERSION_MIN, VERSION_MAX).forEach(v -> testVersionSourceCombo(v, (byte) s))); + } + + @Test + public void testIllegalVersionIds() { + try { + testVersionSourceCombo(63, LogSource.LOCAL); + Assert.fail("expected IllegalArgumentException on version overflow not found"); + } catch (IllegalArgumentException e) { + // ignore - expected + } + try { + testVersionSourceCombo(-1, LogSource.LOCAL); + Assert.fail("expected IllegalArgumentException on version underflow not found"); + } catch (IllegalArgumentException e) { + // ignore - expected + } + } + + @Test + public void testIllegalLogSources() { + LogRecord record = new LogRecord(); + try { + record.setLogSource((byte) -1); + Assert.fail("expected IllegalArgumentException on log source underflow not found"); + } catch (IllegalArgumentException e) { + // ignore - expected + } + try { + record.setLogSource((byte) 4); + Assert.fail("expected IllegalArgumentException on log source overflow not found"); + } catch (IllegalArgumentException e) { + // ignore - expected + } + } + + private void testVersionSourceCombo(int version, byte source) { + buffer.clear(); + LogRecord record = new LogRecord(); + record.setLogType(LogType.FLUSH); + record.setVersion(version); + record.setLogSource(source); + record.computeAndSetLogSize(); + Assert.assertEquals("input version", version, record.getVersion()); + Assert.assertEquals("input source", source, record.getLogSource()); + record.writeLogRecord(buffer); + + buffer.flip(); + LogRecord read = new LogRecord(); + read.readLogRecord(buffer); + Assert.assertEquals("read version", version, read.getVersion()); + Assert.assertEquals("read source", source, read.getLogSource()); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index 04f9751..1dec3fe 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -31,42 +31,6 @@ public interface ILogRecord { LARGE_RECORD } - int CHKSUM_LEN = Long.BYTES; - int FLDCNT_LEN = Integer.BYTES; - int DS_LEN = Integer.BYTES; - int LOG_SOURCE_LEN = Byte.BYTES; - int LOGRCD_SZ_LEN = Integer.BYTES; - int NEWOP_LEN = Byte.BYTES; - int NEWVALSZ_LEN = Integer.BYTES; - int PKHASH_LEN = Integer.BYTES; - int PKSZ_LEN = Integer.BYTES; - int PRVLSN_LEN = Long.BYTES; - int RS_PARTITION_LEN = Integer.BYTES; - int RSID_LEN = Long.BYTES; - int SEQ_NUM_LEN = Long.BYTES; - int TYPE_LEN = Byte.BYTES; - int UUID_LEN = Long.BYTES; - int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES; - int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES; - - int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES; - int ENTITY_RESOURCE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES; - int ENTITY_VALUE_HEADER_LEN = PKHASH_LEN + PKSZ_LEN; - int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; - int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; - - int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; - int ENTITY_COMMIT_LOG_BASE_SIZE = - ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + ENTITY_VALUE_HEADER_LEN + CHKSUM_LEN; - int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER; - int FILTER_LOG_BASE_SIZE = - ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + UPDATE_BODY_HEADER + UPDATE_LSN_HEADER + CHKSUM_LEN; - int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN - + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN; - int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; - int MARKER_BASE_LOG_SIZE = - ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN; - RecordReadStatus readLogRecord(ByteBuffer buffer); void writeLogRecord(ByteBuffer buffer); @@ -93,7 +57,7 @@ public interface ILogRecord { int getPKHashValue(); - void setPKHashValue(int PKHashValue); + void setPKHashValue(int pkHashValue); long getResourceId(); @@ -133,7 +97,7 @@ public interface ILogRecord { void computeAndSetPKValueSize(); - void setPKValue(ITupleReference PKValue); + void setPKValue(ITupleReference pKValue); void readRemoteLog(ByteBuffer buffer); @@ -191,4 +155,8 @@ public interface ILogRecord { long getFlushingComponentMaxId(); void setFlushingComponentMaxId(long flushingComponentMaxId); + + int getVersion(); + + void setVersion(int version); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogConstants.java new file mode 100644 index 0000000..58d020f --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogConstants.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +public class LogConstants { + + public static final int CHKSUM_LEN = Long.BYTES; + public static final int FLDCNT_LEN = Integer.BYTES; + public static final int DS_LEN = Integer.BYTES; + public static final int LOG_SOURCE_LEN = Byte.BYTES; + public static final int LOGRCD_SZ_LEN = Integer.BYTES; + public static final int NEWOP_LEN = Byte.BYTES; + public static final int NEWVALSZ_LEN = Integer.BYTES; + public static final int PKHASH_LEN = Integer.BYTES; + public static final int PKSZ_LEN = Integer.BYTES; + public static final int PRVLSN_LEN = Long.BYTES; + public static final int RS_PARTITION_LEN = Integer.BYTES; + public static final int RSID_LEN = Long.BYTES; + public static final int SEQ_NUM_LEN = Long.BYTES; + public static final int TYPE_LEN = Byte.BYTES; + public static final int UUID_LEN = Long.BYTES; + public static final int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES; + public static final int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES; + + public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES; + public static final int ENTITY_RESOURCE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES; + public static final int ENTITY_VALUE_HEADER_LEN = PKHASH_LEN + PKSZ_LEN; + public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; + public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; + + public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; + public static final int ENTITY_COMMIT_LOG_BASE_SIZE = + ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + ENTITY_VALUE_HEADER_LEN + CHKSUM_LEN; + public static final int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER; + public static final int FILTER_LOG_BASE_SIZE = + ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + UPDATE_BODY_HEADER + UPDATE_LSN_HEADER + CHKSUM_LEN; + public static final int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + + FLUSHING_COMPONENT_MINID_LEN + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN; + public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; + public static final int MARKER_BASE_LOG_SIZE = + ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN; + + public static final int V_0 = 0; + public static final int V_1 = 1; + public static final int V_CURRENT = V_1; + public static final int VERSION_MIN = 0; + public static final int VERSION_MAX = (0xff >> 2) - 1; + + public static final int LOG_SOURCE_MIN = 0; + public static final int LOG_SOURCE_MAX = (1 << 2) - 1; + + private LogConstants() { + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index 7a1079d..cfacf71 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.transactions; +import static org.apache.asterix.common.transactions.LogConstants.*; + import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,14 +27,15 @@ import java.util.zip.CRC32; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference; +import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference; +import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReferenceV0; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; /** * == LogRecordFormat == * --------------------------- * [Header1] (10 bytes) : for all log types - * LogSource(1) + * LogSourceVersion(1) : high 5 bits are used for log record version; low 3 bits are reserved for LogSource * LogType(1) * TxnId(8) * --------------------------- @@ -60,15 +63,15 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; * --------------------------- */ public class LogRecord implements ILogRecord { - // ------------- fields in a log record (begin) ------------// - private byte logSource; + private int version = V_CURRENT; + private byte logSource = LogSource.LOCAL; private byte logType; private long txnId; private int datasetId; - private int PKHashValue; - private int PKValueSize; - private ITupleReference PKValue; + private int pKHashValue; + private int pKValueSize; + private ITupleReference pKValue; private long resourceId; private int resourcePartition; private int logSize; @@ -86,15 +89,17 @@ public class LogRecord implements ILogRecord { private long flushingComponentMaxId; // ------------- fields in a log record (end) --------------// private final ILogMarkerCallback callback; // A callback for log mark operations - private int PKFieldCnt; + private int pKFieldCnt; private ITransactionContext txnCtx; private volatile long LSN; private final AtomicBoolean isFlushed; private final PrimaryKeyTupleReference readPKValue; - private final SimpleTupleReference readNewValue; - private final SimpleTupleReference readOldValue; + private final ITreeIndexTupleReference readNewValue; + private final ITreeIndexTupleReference readOldValue; + private ITreeIndexTupleReference readNewValueV0; + private ITreeIndexTupleReference readOldValueV0; private final CRC32 checksumGen; - private int[] PKFields; + private int[] pKFields; private PrimaryIndexOperationTracker opTracker; /** @@ -113,7 +118,6 @@ public class LogRecord implements ILogRecord { readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference(); readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference(); checksumGen = new CRC32(); - logSource = LogSource.LOCAL; } public LogRecord() { @@ -121,7 +125,7 @@ public class LogRecord implements ILogRecord { } private void doWriteLogRecord(ByteBuffer buffer) { - buffer.put(logSource); + buffer.put((byte) (version << 2 | (logSource & 0xff))); buffer.put(logType); buffer.putLong(txnId); switch (logType) { @@ -172,11 +176,11 @@ public class LogRecord implements ILogRecord { } private void writeEntityValue(ByteBuffer buffer) { - buffer.putInt(PKHashValue); - if (PKValueSize <= 0) { + buffer.putInt(pKHashValue); + if (pKValueSize <= 0) { throw new IllegalStateException("Primary Key Size is less than or equal to 0"); } - buffer.putInt(PKValueSize); + buffer.putInt(pKValueSize); writePKValue(buffer); } @@ -203,13 +207,13 @@ public class LogRecord implements ILogRecord { private void writePKValue(ByteBuffer buffer) { if (logSource == LogSource.LOCAL) { - for (int i = 0; i < PKFieldCnt; i++) { - buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]), - PKValue.getFieldLength(PKFields[i])); + for (int i = 0; i < pKFieldCnt; i++) { + buffer.put(pKValue.getFieldData(0), pKValue.getFieldStart(pKFields[i]), + pKValue.getFieldLength(pKFields[i])); } } else { - // since PKValue is already serialized in remote logs, just put it into buffer - buffer.put(PKValue.getFieldData(0), 0, PKValueSize); + // since pKValue is already serialized in remote logs, just put it into buffer + buffer.put(pKValue.getFieldData(0), 0, pKValueSize); } } @@ -253,7 +257,22 @@ public class LogRecord implements ILogRecord { if (buffer.remaining() < ALL_RECORD_HEADER_LEN) { return RecordReadStatus.TRUNCATED; } - logSource = buffer.get(); + byte logSourceVersion = buffer.get(); + logSource = (byte) (logSourceVersion & 0x3); + version = (byte) ((logSourceVersion & 0xff) >> 2); + ITreeIndexTupleReference readOld; + ITreeIndexTupleReference readNew; + if (version == V_0) { + if (readOldValueV0 == null) { + readOldValueV0 = new SimpleTupleReferenceV0(); + readNewValueV0 = new SimpleTupleReferenceV0(); + } + readOld = readOldValueV0; + readNew = readNewValueV0; + } else { + readOld = readOldValue; + readNew = readNewValue; + } logType = buffer.get(); txnId = buffer.getLong(); switch (logType) { @@ -276,7 +295,7 @@ public class LogRecord implements ILogRecord { case LogType.JOB_COMMIT: case LogType.ABORT: datasetId = -1; - PKHashValue = -1; + pKHashValue = -1; computeAndSetLogSize(); break; case LogType.ENTITY_COMMIT: @@ -288,7 +307,7 @@ public class LogRecord implements ILogRecord { break; case LogType.UPDATE: if (readEntityResource(buffer) && readEntityValue(buffer)) { - return readUpdateInfo(buffer); + return readUpdateInfo(buffer, readNew, readOld); } else { return RecordReadStatus.TRUNCATED; } @@ -316,7 +335,7 @@ public class LogRecord implements ILogRecord { break; case LogType.FILTER: if (readEntityResource(buffer)) { - return readUpdateInfo(buffer); + return readUpdateInfo(buffer, readNew, readOld); } else { return RecordReadStatus.TRUNCATED; } @@ -331,16 +350,16 @@ public class LogRecord implements ILogRecord { if (buffer.remaining() < ENTITY_VALUE_HEADER_LEN) { return false; } - PKHashValue = buffer.getInt(); - PKValueSize = buffer.getInt(); + pKHashValue = buffer.getInt(); + pKValueSize = buffer.getInt(); // attempt to read in the PK - if (buffer.remaining() < PKValueSize) { + if (buffer.remaining() < pKValueSize) { return false; } - if (PKValueSize <= 0) { + if (pKValueSize <= 0) { throw new IllegalStateException("Primary Key Size is less than or equal to 0"); } - PKValue = readPKValue(buffer); + pKValue = readPKValue(buffer); return true; } @@ -354,7 +373,8 @@ public class LogRecord implements ILogRecord { return true; } - private RecordReadStatus readUpdateInfo(ByteBuffer buffer) { + private RecordReadStatus readUpdateInfo(ByteBuffer buffer, ITreeIndexTupleReference newRead, + ITreeIndexTupleReference oldRead) { if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) { return RecordReadStatus.TRUNCATED; } @@ -369,7 +389,7 @@ public class LogRecord implements ILogRecord { } return RecordReadStatus.TRUNCATED; } - newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize); + newValue = readTuple(buffer, newRead, newValueFieldCount, newValueSize); if (logSize > getUpdateLogSizeWithoutOldValue()) { // Prev Image exists if (buffer.remaining() < Integer.BYTES) { @@ -383,7 +403,7 @@ public class LogRecord implements ILogRecord { if (buffer.remaining() < oldValueSize) { return RecordReadStatus.TRUNCATED; } - oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize); + oldValue = readTuple(buffer, oldRead, oldValueFieldCount, oldValueSize); } else { oldValueSize = 0; oldValue = null; @@ -402,15 +422,15 @@ public class LogRecord implements ILogRecord { } private ITupleReference readPKValue(ByteBuffer buffer) { - if (buffer.position() + PKValueSize > buffer.limit()) { + if (buffer.position() + pKValueSize > buffer.limit()) { throw new BufferUnderflowException(); } - readPKValue.reset(buffer.array(), buffer.position(), PKValueSize); - buffer.position(buffer.position() + PKValueSize); + readPKValue.reset(buffer.array(), buffer.position(), pKValueSize); + buffer.position(buffer.position() + pKValueSize); return readPKValue; } - private static ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, + private static ITupleReference readTuple(ByteBuffer srcBuffer, ITreeIndexTupleReference destTuple, int fieldCnt, int size) { if (srcBuffer.position() + size > srcBuffer.limit()) { throw new BufferUnderflowException(); @@ -424,9 +444,9 @@ public class LogRecord implements ILogRecord { @Override public void computeAndSetPKValueSize() { int i; - PKValueSize = 0; - for (i = 0; i < PKFieldCnt; i++) { - PKValueSize += PKValue.getFieldLength(PKFields[i]); + pKValueSize = 0; + for (i = 0; i < pKFieldCnt; i++) { + pKValueSize += pKValue.getFieldLength(pKFields[i]); } } @@ -442,7 +462,7 @@ public class LogRecord implements ILogRecord { } private int getUpdateLogSizeWithoutOldValue() { - return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize; + return UPDATE_LOG_BASE_SIZE + pKValueSize + newValueSize; } @Override @@ -456,7 +476,7 @@ public class LogRecord implements ILogRecord { logSize = JOB_TERMINATE_LOG_SIZE; break; case LogType.ENTITY_COMMIT: - logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize; + logSize = ENTITY_COMMIT_LOG_BASE_SIZE + pKValueSize; break; case LogType.FLUSH: logSize = FLUSH_LOG_SIZE; @@ -491,13 +511,14 @@ public class LogRecord implements ILogRecord { if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) { builder.append(" DatasetId : ").append(datasetId); builder.append(" ResourcePartition : ").append(resourcePartition); - builder.append(" PKHashValue : ").append(PKHashValue); - builder.append(" PKFieldCnt : ").append(PKFieldCnt); - builder.append(" PKSize: ").append(PKValueSize); + builder.append(" PKHashValue : ").append(pKHashValue); + builder.append(" PKFieldCnt : ").append(pKFieldCnt); + builder.append(" PKSize: ").append(pKValueSize); } if (logType == LogType.UPDATE) { builder.append(" ResourceId : ").append(resourceId); } + builder.append(" Version : ").append(version); return builder.toString(); } @@ -557,12 +578,12 @@ public class LogRecord implements ILogRecord { @Override public int getPKHashValue() { - return PKHashValue; + return pKHashValue; } @Override - public void setPKHashValue(int PKHashValue) { - this.PKHashValue = PKHashValue; + public void setPKHashValue(int pKHashValue) { + this.pKHashValue = pKHashValue; } @Override @@ -644,23 +665,23 @@ public class LogRecord implements ILogRecord { @Override public int getPKValueSize() { - return PKValueSize; + return pKValueSize; } @Override public ITupleReference getPKValue() { - return PKValue; + return pKValue; } @Override public void setPKFields(int[] primaryKeyFields) { - PKFields = primaryKeyFields; - PKFieldCnt = PKFields.length; + pKFields = primaryKeyFields; + pKFieldCnt = pKFields.length; } @Override - public void setPKValue(ITupleReference PKValue) { - this.PKValue = PKValue; + public void setPKValue(ITupleReference pKValue) { + this.pKValue = pKValue; } public PrimaryIndexOperationTracker getOpTracker() { @@ -669,6 +690,11 @@ public class LogRecord implements ILogRecord { @Override public void setLogSource(byte logSource) { + if (logSource < LOG_SOURCE_MIN) { + throw new IllegalArgumentException("logSource underflow: " + logSource); + } else if (logSource > LOG_SOURCE_MAX) { + throw new IllegalArgumentException("logSource overflow: " + logSource); + } this.logSource = logSource; } @@ -678,7 +704,7 @@ public class LogRecord implements ILogRecord { } public void setPKFieldCnt(int pKFieldCnt) { - PKFieldCnt = pKFieldCnt; + this.pKFieldCnt = pKFieldCnt; } public void setOpTracker(PrimaryIndexOperationTracker opTracker) { @@ -782,4 +808,19 @@ public class LogRecord implements ILogRecord { public void setFlushingComponentMaxId(long flushingComponentMaxId) { this.flushingComponentMaxId = flushingComponentMaxId; } + + @Override + public int getVersion() { + return version; + } + + @Override + public void setVersion(int version) { + if (version < VERSION_MIN) { + throw new IllegalArgumentException("version underflow: " + version); + } else if (version > VERSION_MAX) { + throw new IllegalArgumentException("version overflow: " + version); + } + this.version = (byte) version; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java index 690eeb6..25ff401 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java @@ -58,15 +58,15 @@ public class TransactionUtil { } public static void formEntityCommitLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId, - int PKHashValue, ITupleReference PKValue, int[] PKFields, int resourcePartition, byte entityCommitType) { + int pKHashValue, ITupleReference pKValue, int[] pKFields, int resourcePartition, byte entityCommitType) { logRecord.setTxnCtx(txnCtx); logRecord.setLogType(entityCommitType); logRecord.setTxnId(txnCtx.getTxnId().getId()); logRecord.setDatasetId(datasetId); - logRecord.setPKHashValue(PKHashValue); - logRecord.setPKFieldCnt(PKFields.length); - logRecord.setPKValue(PKValue); - logRecord.setPKFields(PKFields); + logRecord.setPKHashValue(pKHashValue); + logRecord.setPKFieldCnt(pKFields.length); + logRecord.setPKValue(pKValue); + logRecord.setPKFields(pKFields); logRecord.setResourcePartition(resourcePartition); logRecord.computeAndSetPKValueSize(); logRecord.computeAndSetLogSize(); diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java index 308920b..9123a5d 100644 --- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java +++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java @@ -19,7 +19,6 @@ package org.apache.asterix.test.server; import java.io.File; -import java.io.FilenameFilter; import java.util.ArrayList; import java.util.Collection; import java.util.Map; diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.1.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.1.script.aql new file mode 100644 index 0000000..7d441cd --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.1.script.aql @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +create_and_start.sh \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.2.ddl.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.2.ddl.aql new file mode 100644 index 0000000..02f3a72 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.2.ddl.aql @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : large_object_100K + * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB) + * Expected Result : Success + * Date : December 25 2018 + */ + +drop dataverse recovery if exists; +create dataverse recovery; +use dataverse recovery; + +create type RecoveryType as { + uid: uuid, + numbers: [int] +}; + +create dataset RecoveryDataset (RecoveryType) +primary key uid autogenerated; diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.3.update.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.3.update.aql new file mode 100644 index 0000000..1e24bcf --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.3.update.aql @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : large_object_100K + * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB) + * Expected Result : Success + * Date : December 25 2018 + */ + +use dataverse recovery; + +//Create a 100KB record (8-bytes * 12800 = 102400 bytes) +insert into dataset RecoveryDataset ( + {"numbers": range(1, 12800)} +); \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.4.txnqbc.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.4.txnqbc.aql new file mode 100644 index 0000000..2e19c39 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.4.txnqbc.aql @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : large_object_100K + * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB) + * Expected Result : Success + * Date : December 25 2018 + */ + +use dataverse recovery; + +for $x in dataset RecoveryDataset +return count($x.numbers); \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.5.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.5.script.aql new file mode 100644 index 0000000..4583455 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.5.script.aql @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +kill_cc_and_nc.sh \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.6.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.6.script.aql new file mode 100644 index 0000000..7087cd3 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.6.script.aql @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +stop_and_start.sh \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.7.txnqar.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.7.txnqar.aql new file mode 100644 index 0000000..2e19c39 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.7.txnqar.aql @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : large_object_100K + * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB) + * Expected Result : Success + * Date : December 25 2018 + */ + +use dataverse recovery; + +for $x in dataset RecoveryDataset +return count($x.numbers); \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.8.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.8.script.aql new file mode 100644 index 0000000..40df6fb --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.8.script.aql @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +stop_and_delete.sh \ No newline at end of file diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/create_and_start.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/create_and_start.sh new file mode 100755 index 0000000..e358618 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/create_and_start.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +$NCSERVICE_HOME/opt/local/bin/start-sample-cluster.sh 1>/dev/null 2>&1; diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/kill_cc_and_nc.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/kill_cc_and_nc.sh new file mode 100755 index 0000000..b6326cc --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/kill_cc_and_nc.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print $2}' | xargs -n 1 kill -9 +ps -ef | awk '/java.*org\.apache\.hyracks\.control\.nc\.service\.NCService/ {print $2}' | xargs -n 1 kill -9 diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_delete.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_delete.sh new file mode 100755 index 0000000..818d17d --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_delete.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +$NCSERVICE_HOME/opt/local/bin/stop-sample-cluster.sh; +rm -rf $NCSERVICE_HOME/opt/local/data; + diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_start.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_start.sh new file mode 100755 index 0000000..9a0c506 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_start.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +$NCSERVICE_HOME/opt/local/bin/stop-sample-cluster.sh; +$NCSERVICE_HOME/opt/local/bin/start-sample-cluster.sh; diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml b/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml index 38179b2..8343e85 100644 --- a/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml +++ b/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml @@ -163,6 +163,12 @@ <!-- <expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error> --> </compilation-unit> </test-case> + + <test-case FilePath="recover_after_abort"> + <compilation-unit name="large_object_100K"> + <output-dir compare="Text">large_object_100K</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="recovery_ddl"> diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 28290bb..1aa040d 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -31,6 +31,7 @@ import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ILogRequester; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.transactions.LogConstants; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; @@ -74,7 +75,7 @@ public class LogBuffer implements ILogBuffer { full = new AtomicBoolean(false); appendOffset = 0; flushOffset = 0; - syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE); + syncCommitQ = new LinkedBlockingQueue<>(logPageSize / LogConstants.JOB_TERMINATE_LOG_SIZE); flushQ = new LinkedBlockingQueue<>(); remoteJobsQ = new LinkedBlockingQueue<>(); reusableTxnId = new MutableTxnId(-1); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index a990379..e66185c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -19,7 +19,6 @@ package org.apache.asterix.transaction.management.service.logging; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.io.OutputStream; import java.io.RandomAccessFile; @@ -47,6 +46,7 @@ import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.transactions.LogConstants; import org.apache.asterix.common.transactions.LogManagerProperties; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; @@ -83,8 +83,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent { private FileChannel appendChannel; private ILogBuffer appendPage; private LogFlusher logFlusher; - private Future<? extends Object> futureLogFlusher; - protected LinkedBlockingQueue<ILogRecord> flushLogsQ; + private Future<?> futureLogFlusher; + private LinkedBlockingQueue<ILogRecord> flushLogsQ; private long currentLogFileId; public LogManager(ITransactionSubsystem txnSubsystem) { @@ -448,15 +448,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { if (!fileLogDir.isDirectory()) { throw new IllegalStateException("log dir " + logDir + " exists but it is not a directory"); } - logFileNames = fileLogDir.list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - if (name.startsWith(logFilePrefix)) { - return true; - } - return false; - } - }); + logFileNames = fileLogDir.list((dir, name) -> name.startsWith(logFilePrefix)); if (logFileNames == null) { throw new IllegalStateException("listing of log dir (" + logDir + ") files returned null. " + "Either an IO error occurred or the dir was just deleted by another process/thread"); @@ -627,7 +619,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { class LogFlusher implements Callable<Boolean> { private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(); - private static final ILogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null); + private static final ILogBuffer POISON_PILL = new LogBuffer(null, LogConstants.JOB_TERMINATE_LOG_SIZE, null); private final LogManager logMgr;//for debugging private final LinkedBlockingQueue<ILogBuffer> emptyQ; private final LinkedBlockingQueue<ILogBuffer> flushQ; diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java index 3d79adc..30caab7 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java @@ -239,37 +239,37 @@ public class LogReader implements ILogReader { throw new ACIDException(e); } - ByteBuffer readBuffer = this.readBuffer; + readRecord(lsn); + logRecord.setLSN(readLSN); + readLSN += logRecord.getLogSize(); + return logRecord; + } + + private void readRecord(long lsn) { + ByteBuffer buffer = this.readBuffer; while (true) { - RecordReadStatus status = logRecord.readLogRecord(readBuffer); + RecordReadStatus status = logRecord.readLogRecord(buffer); switch (status) { - case LARGE_RECORD: { - readBuffer = ByteBuffer.allocate(logRecord.getLogSize()); - fillLogReadBuffer(logRecord.getLogSize(), readBuffer); + case LARGE_RECORD: + buffer = ByteBuffer.allocate(logRecord.getLogSize()); + fillLogReadBuffer(logRecord.getLogSize(), buffer); //now see what we have in the refilled buffer - continue; - } - case TRUNCATED: { + break; + case TRUNCATED: if (!fillLogReadBuffer()) { throw new IllegalStateException( "Could not read LSN(" + lsn + ") from log file id " + logFile.getLogFileId()); } //now read the complete log record - continue; - } - case BAD_CHKSUM: { + break; + case BAD_CHKSUM: throw new ACIDException("Log record has incorrect checksum"); - } case OK: - break; + return; default: throw new IllegalStateException("Unexpected log read status: " + status); } - break; } - logRecord.setLSN(readLSN); - readLSN += logRecord.getLogSize(); - return logRecord; } private void getLogFile() { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java index af74b13..cd13de5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java @@ -18,10 +18,10 @@ */ package org.apache.asterix.transaction.management.service.recovery; +import static org.apache.asterix.common.transactions.LogConstants.*; + import java.nio.ByteBuffer; -import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @@ -165,7 +165,7 @@ public class TxnEntityId { public int getCurrentSize() { //txn id, dataset id, pkHashValue, arraySize, isByteArrayPKValue - int size = TxnId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES; + int size = TxnId.BYTES + DS_LEN + PKHASH_LEN + PKSZ_LEN + Byte.BYTES; //byte arraySize if (isByteArrayPKValue && byteArrayPKValue != null) { size += byteArrayPKValue.length; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java index e82b037..cc485ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.common.tuples; -import org.apache.hyracks.data.std.primitive.ShortPointable; +import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference; import org.apache.hyracks.storage.am.common.util.BitOperationUtils; @@ -71,10 +71,10 @@ public class SimpleTupleReference implements ITreeIndexTupleReference { @Override public int getFieldLength(int fIdx) { if (fIdx == 0) { - return ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes); + return IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes); } else { - return ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + fIdx * 2) - - ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * 2)); + return IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + fIdx * Integer.BYTES) + - IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * Integer.BYTES)); } } @@ -84,7 +84,7 @@ public class SimpleTupleReference implements ITreeIndexTupleReference { return tupleStartOff + nullFlagsBytes + fieldSlotsBytes; } else { return tupleStartOff + nullFlagsBytes + fieldSlotsBytes - + ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * 2)); + + IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * Integer.BYTES)); } } @@ -93,12 +93,12 @@ public class SimpleTupleReference implements ITreeIndexTupleReference { } protected int getFieldSlotsBytes() { - return fieldCount * 2; + return fieldCount * Integer.BYTES; } @Override public int getTupleSize() { return nullFlagsBytes + fieldSlotsBytes - + ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + (fieldCount - 1) * 2); + + IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + (fieldCount - 1) * Integer.BYTES); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReferenceV0.java similarity index 57% copy from hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java copy to hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReferenceV0.java index e82b037..0b4e8c5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReferenceV0.java @@ -20,53 +20,11 @@ package org.apache.hyracks.storage.am.common.tuples; import org.apache.hyracks.data.std.primitive.ShortPointable; -import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; -import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference; -import org.apache.hyracks.storage.am.common.util.BitOperationUtils; -public class SimpleTupleReference implements ITreeIndexTupleReference { - - protected byte[] buf; - protected int fieldStartIndex; - protected int fieldCount; - protected int tupleStartOff; - protected int nullFlagsBytes; - protected int fieldSlotsBytes; - - @Override - public void resetByTupleOffset(byte[] buf, int tupleStartOff) { - this.buf = buf; - this.tupleStartOff = tupleStartOff; - } - - @Override - public void resetByTupleIndex(ITreeIndexFrame frame, int tupleIndex) { - resetByTupleOffset(frame.getBuffer().array(), frame.getTupleOffset(tupleIndex)); - } - - @Override - public void setFieldCount(int fieldCount) { - this.fieldCount = fieldCount; - nullFlagsBytes = getNullFlagsBytes(); - fieldSlotsBytes = getFieldSlotsBytes(); - fieldStartIndex = 0; - } - - @Override - public void setFieldCount(int fieldStartIndex, int fieldCount) { - this.fieldCount = fieldCount; - this.fieldStartIndex = fieldStartIndex; - } - - @Override - public int getFieldCount() { - return fieldCount; - } - - @Override - public byte[] getFieldData(int fIdx) { - return buf; - } +/** + * This class is only used to read tuple references from log version 0 + */ +public class SimpleTupleReferenceV0 extends SimpleTupleReference { @Override public int getFieldLength(int fIdx) { @@ -88,10 +46,7 @@ public class SimpleTupleReference implements ITreeIndexTupleReference { } } - protected int getNullFlagsBytes() { - return BitOperationUtils.getFlagBytes(fieldCount); - } - + @Override protected int getFieldSlotsBytes() { return fieldCount * 2; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java index 410a0e3..ca7217e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.common.tuples; import java.nio.ByteBuffer; +import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter; import org.apache.hyracks.storage.am.common.util.BitOperationUtils; @@ -35,12 +36,6 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter { private SimpleTupleWriter() { } - // Write short in little endian to target byte array at given offset. - private static void writeShortL(short s, byte[] buf, int targetOff) { - buf[targetOff] = (byte) (s >> 8); - buf[targetOff + 1] = (byte) (s >> 0); - } - @Override public int bytesRequired(ITupleReference tuple) { int bytes = getNullFlagsBytes(tuple) + getFieldSlotsBytes(tuple); @@ -83,7 +78,7 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter { System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), targetBuf, runner, tuple.getFieldLength(i)); fieldEndOff += tuple.getFieldLength(i); runner += tuple.getFieldLength(i); - writeShortL((short) fieldEndOff, targetBuf, targetOff + nullFlagsBytes + i * 2); + IntegerPointable.setInteger(targetBuf, targetOff + nullFlagsBytes + i * Integer.BYTES, fieldEndOff); } return runner - targetOff; } @@ -103,7 +98,8 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter { System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), targetBuf, runner, tuple.getFieldLength(i)); fieldEndOff += tuple.getFieldLength(i); runner += tuple.getFieldLength(i); - writeShortL((short) fieldEndOff, targetBuf, targetOff + nullFlagsBytes + fieldCounter * 2); + IntegerPointable.setInteger(targetBuf, targetOff + nullFlagsBytes + fieldCounter * Integer.BYTES, + fieldEndOff); fieldCounter++; } @@ -115,7 +111,7 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter { } protected int getFieldSlotsBytes(ITupleReference tuple) { - return tuple.getFieldCount() * 2; + return tuple.getFieldCount() * Integer.BYTES; } protected int getNullFlagsBytes(ITupleReference tuple, int startField, int numFields) { @@ -123,7 +119,7 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter { } protected int getFieldSlotsBytes(ITupleReference tuple, int startField, int numFields) { - return numFields * 2; + return numFields * Integer.BYTES; } @Override