http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java new file mode 100644 index 0000000..4b9e646 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.persist; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.Method; + +/** + * Utility for handling HDFS file lease recovery. This is a copy-n-paste fork of + * {@link org.apache.hadoop.hbase.util.FSHDFSUtils} from the latest HBase 0.94 version (as of 0.94.12), + * which contains some additional fixes not present in our current HBase dependency version -- + * mainly checking the return value of the {@code DistributedFileSystem.recoverLease()} call to verify that + * recovery succeeded. + */ +public class HDFSUtil { + private static final Logger LOG = LoggerFactory.getLogger(HDFSUtil.class); + /** + * Recover the lease from HDFS, retrying multiple times. + */ + public void recoverFileLease(final FileSystem fs, final Path p, + Configuration conf) + throws IOException { + // lease recovery not needed for local file system case. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + recoverDFSFileLease((DistributedFileSystem) fs, p, conf); + } + + /* + * Run the dfs recover lease. recoverLease is asynchronous. It returns: + * -false when it starts the lease recovery (i.e. lease recovery not *yet* done) + * - true when the lease recovery has succeeded or the file is closed. + * But, we have to be careful. Each time we call recoverLease, it starts the recover lease + * process over from the beginning. We could put ourselves in a situation where we are + * doing nothing but starting a recovery, interrupting it to start again, and so on. + * The findings over in HBASE-8354 have it that the namenode will try to recover the lease + * on the file's primary node. If all is well, it should return near immediately. But, + * as is common, it is the very primary node that has crashed and so the namenode will be + * stuck waiting on a socket timeout before it will ask another datanode to start the + * recovery. It does not help if we call recoverLease in the meantime and in particular, + * subsequent to the socket timeout, a recoverLease invocation will cause us to start + * over from square one (possibly waiting on socket timeout against primary node). So, + * in the below, we do the following: + * 1. Call recoverLease. + * 2. If it returns true, break. + * 3. If it returns false, wait a few seconds and then call it again. + * 4. If it returns true, break. + * 5. If it returns false, wait for what we think the datanode socket timeout is + * (configurable) and then try again. + * 6. If it returns true, break. + * 7. If it returns false, repeat starting at step 5. above. + * + * If HDFS-4525 is available, call it every second and we might be able to exit early. + */ + boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, + final Configuration conf) + throws IOException { + LOG.info("Recovering lease on dfs file " + p); + long startWaiting = System.currentTimeMillis(); + // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + // beyond that limit 'to be safe'. + long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; + // This setting should be what the cluster dfs heartbeat is set to. + long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000); + // This should be set to how long it'll take for us to timeout against primary datanode if it + // is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the + // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. + long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000); + + Method isFileClosedMeth = null; + // whether we need to look for isFileClosed method + boolean findIsFileClosedMeth = true; + boolean recovered = false; + // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. + for (int nbAttempt = 0; !recovered; nbAttempt++) { + recovered = recoverLease(dfs, nbAttempt, p, startWaiting); + if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { + break; + } + try { + // On the first time through wait the short 'firstPause'. + if (nbAttempt == 0) { + Thread.sleep(firstPause); + } else { + // Cycle here until subsequentPause elapses. While spinning, check isFileClosed if + // available (should be in hadoop 2.0.5... not in hadoop 1 though. + long localStartWaiting = System.currentTimeMillis(); + while ((System.currentTimeMillis() - localStartWaiting) < + subsequentPause) { + Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); + if (findIsFileClosedMeth) { + try { + isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", + new Class[]{ Path.class }); + } catch (NoSuchMethodException nsme) { + LOG.debug("isFileClosed not available"); + } finally { + findIsFileClosedMeth = false; + } + } + if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { + recovered = true; + break; + } + } + } + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + return recovered; + } + + boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, + final int nbAttempt, final Path p, final long startWaiting) { + if (recoveryTimeout < System.currentTimeMillis()) { + LOG.warn("Cannot recoverLease after trying for " + + conf.getInt("hbase.lease.recovery.timeout", 900000) + + "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + + getLogMessageDetail(nbAttempt, p, startWaiting)); + return true; + } + return false; + } + + /** + * Try to recover the lease. + * @param dfs The filesystem instance. + * @param nbAttempt Count number of this attempt. + * @param p Path of the file to recover. + * @param startWaiting Timestamp of when we started attempting to recover the file lease. + * @return True if dfs#recoverLease came by true. + * @throws java.io.FileNotFoundException + */ + boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, + final long startWaiting) + throws FileNotFoundException { + boolean recovered = false; + try { + recovered = dfs.recoverLease(p); + LOG.info("recoverLease=" + recovered + ", " + + getLogMessageDetail(nbAttempt, p, startWaiting)); + } catch (IOException e) { + if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { + // This exception comes out instead of FNFE, fix it + throw new FileNotFoundException("The given file wasn't found at " + p); + } else if (e instanceof FileNotFoundException) { + throw (FileNotFoundException) e; + } + LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); + } + return recovered; + } + + /** + * @param nbAttempt Attempt number for the lease recovery. + * @param p Path of the file to recover. + * @param startWaiting Timestamp of when we started attempting to recover the file lease. + * @return Detail to append to any log message around lease recovering. + */ + private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) { + return "attempt=" + nbAttempt + " on file=" + p + " after " + + (System.currentTimeMillis() - startWaiting) + "ms"; + } + + /** + * Call HDFS-4525 isFileClosed if it is available. + * @param dfs Filesystem instance to use. + * @param m Method instance to call. + * @param p Path of the file to check is closed. + * @return True if file is closed. + */ + private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) { + try { + return (Boolean) m.invoke(dfs, p); + } catch (SecurityException e) { + LOG.warn("No access", e); + } catch (Exception e) { + LOG.warn("Failed invocation for " + p.toString(), e); + } + return false; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java new file mode 100644 index 0000000..d81ba38 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import org.apache.tephra.metrics.MetricsCollector; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * Reads and writes transaction logs against files in the local filesystem. + */ +public class LocalFileTransactionLog extends AbstractTransactionLog { + private final File logFile; + + /** + * Creates a new transaction log using the given file instance. + * @param logFile The log file to use. + */ + public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) { + super(timestamp, metricsCollector); + this.logFile = logFile; + } + + @Override + public String getName() { + return logFile.getAbsolutePath(); + } + + @Override + protected TransactionLogWriter createWriter() throws IOException { + return new LogWriter(logFile); + } + + @Override + public TransactionLogReader getReader() throws IOException { + return new LogReader(logFile); + } + + private static final class LogWriter implements TransactionLogWriter { + private final FileOutputStream fos; + private final DataOutputStream out; + + public LogWriter(File logFile) throws IOException { + this.fos = new FileOutputStream(logFile); + this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE)); + } + + @Override + public void append(Entry entry) throws IOException { + entry.write(out); + } + + @Override + public void commitMarker(int count) throws IOException { + // skip for local file + } + + @Override + public void sync() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.flush(); + out.close(); + fos.close(); + } + } + + private static final class LogReader implements TransactionLogReader { + private final FileInputStream fin; + private final DataInputStream in; + private Entry reuseEntry = new Entry(); + + public LogReader(File logFile) throws IOException { + this.fin = new FileInputStream(logFile); + this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE)); + } + + @Override + public TransactionEdit next() throws IOException { + Entry entry = new Entry(); + try { + entry.readFields(in); + } catch (EOFException eofe) { + // signal end of file by returning null + return null; + } + return entry.getEdit(); + } + + @Override + public TransactionEdit next(TransactionEdit reuse) throws IOException { + try { + reuseEntry.getKey().readFields(in); + reuse.readFields(in); + } catch (EOFException eofe) { + // signal end of file by returning null + return null; + } + return reuse; + } + + @Override + public void close() throws IOException { + in.close(); + fin.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java new file mode 100644 index 0000000..beddbb2 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import com.google.common.primitives.Longs; +import com.google.inject.Inject; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Persists transaction snapshots and write-ahead logs to files on the local filesystem. + */ +public class LocalFileTransactionStateStorage extends AbstractTransactionStateStorage { + private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress."; + private static final String SNAPSHOT_FILE_PREFIX = "snapshot."; + private static final String LOG_FILE_PREFIX = "txlog."; + private static final Logger LOG = LoggerFactory.getLogger(LocalFileTransactionStateStorage.class); + static final int BUFFER_SIZE = 16384; + + private static final FilenameFilter SNAPSHOT_FILE_FILTER = new FilenameFilter() { + @Override + public boolean accept(File file, String s) { + return s.startsWith(SNAPSHOT_FILE_PREFIX); + } + }; + + private final String configuredSnapshotDir; + private final MetricsCollector metricsCollector; + private File snapshotDir; + + @Inject + public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider, + MetricsCollector metricsCollector) { + super(codecProvider); + this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR); + this.metricsCollector = metricsCollector; + } + + @Override + protected void startUp() throws Exception { + Preconditions.checkState(configuredSnapshotDir != null, + "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR + + " in configuration."); + snapshotDir = new File(configuredSnapshotDir); + } + + @Override + protected void shutDown() throws Exception { + // nothing to do + } + + @Override + public String getLocation() { + return snapshotDir.getAbsolutePath(); + } + + @Override + public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { + // save the snapshot to a temporary file + File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); + LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile); + OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput(); + boolean threw = true; + try { + codecProvider.encode(out, snapshot); + threw = false; + } finally { + Closeables.close(out, threw); + } + + // move the temporary file into place with the correct filename + File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); + if (!snapshotTmpFile.renameTo(finalFile)) { + throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " + + finalFile.getName()); + } + + LOG.debug("Completed snapshot to file {}", finalFile); + } + + @Override + public TransactionSnapshot getLatestSnapshot() throws IOException { + InputStream is = getLatestSnapshotInputStream(); + if (is == null) { + return null; + } + try { + return readSnapshotFile(is); + } finally { + is.close(); + } + } + + @Override + public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { + InputStream is = getLatestSnapshotInputStream(); + if (is == null) { + return null; + } + try { + return codecProvider.decodeTransactionVisibilityState(is); + } finally { + is.close(); + } + } + + private InputStream getLatestSnapshotInputStream() throws IOException { + File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER); + TimestampedFilename mostRecent = null; + for (File file : snapshotFiles) { + TimestampedFilename tsFile = new TimestampedFilename(file); + if (mostRecent == null || tsFile.compareTo(mostRecent) > 0) { + mostRecent = tsFile; + } + } + + if (mostRecent == null) { + LOG.info("No snapshot files found in {}", snapshotDir.getAbsolutePath()); + return null; + } + + return new FileInputStream(mostRecent.getFile()); + } + + private TransactionSnapshot readSnapshotFile(InputStream is) throws IOException { + return codecProvider.decode(is); + } + + @Override + public long deleteOldSnapshots(int numberToKeep) throws IOException { + File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER); + if (snapshotFiles.length == 0) { + return -1; + } + TimestampedFilename[] snapshotFilenames = new TimestampedFilename[snapshotFiles.length]; + for (int i = 0; i < snapshotFiles.length; i++) { + snapshotFilenames[i] = new TimestampedFilename(snapshotFiles[i]); + } + Arrays.sort(snapshotFilenames, Collections.reverseOrder()); + if (snapshotFilenames.length <= numberToKeep) { + // nothing to delete, just return the oldest timestamp + return snapshotFilenames[snapshotFilenames.length - 1].getTimestamp(); + } + int toRemoveCount = snapshotFilenames.length - numberToKeep; + TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount]; + System.arraycopy(snapshotFilenames, numberToKeep, toRemove, 0, toRemoveCount); + int removedCnt = 0; + for (int i = 0; i < toRemove.length; i++) { + File currentFile = toRemove[i].getFile(); + LOG.debug("Removing old snapshot file {}", currentFile.getAbsolutePath()); + if (!toRemove[i].getFile().delete()) { + LOG.error("Failed deleting snapshot file {}", currentFile.getAbsolutePath()); + } else { + removedCnt++; + } + } + long oldestTimestamp = snapshotFilenames[numberToKeep - 1].getTimestamp(); + LOG.info("Removed {} out of {} expected snapshot files older than {}", removedCnt, toRemoveCount, oldestTimestamp); + return oldestTimestamp; + } + + @Override + public List<String> listSnapshots() throws IOException { + File[] snapshots = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER); + return Lists.transform(Arrays.asList(snapshots), new Function<File, String>() { + @Nullable + @Override + public String apply(@Nullable File input) { + return input.getName(); + } + }); + } + + @Override + public List<TransactionLog> getLogsSince(long timestamp) throws IOException { + File[] logFiles = snapshotDir.listFiles(new LogFileFilter(timestamp, Long.MAX_VALUE)); + TimestampedFilename[] timestampedFiles = new TimestampedFilename[logFiles.length]; + for (int i = 0; i < logFiles.length; i++) { + timestampedFiles[i] = new TimestampedFilename(logFiles[i]); + } + // logs need to be processed in ascending order + Arrays.sort(timestampedFiles); + return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() { + @Nullable + @Override + public TransactionLog apply(@Nullable TimestampedFilename input) { + return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector); + } + }); + } + + @Override + public TransactionLog createLog(long timestamp) throws IOException { + File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp); + LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath()); + return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector); + } + + @Override + public void deleteLogsOlderThan(long timestamp) throws IOException { + File[] logFiles = snapshotDir.listFiles(new LogFileFilter(0, timestamp)); + int removedCnt = 0; + for (File file : logFiles) { + LOG.debug("Removing old transaction log {}", file.getPath()); + if (file.delete()) { + removedCnt++; + } else { + LOG.warn("Failed to remove log file {}", file.getAbsolutePath()); + } + } + LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp); + } + + @Override + public void setupStorage() throws IOException { + // create the directory if it doesn't exist + if (!snapshotDir.exists()) { + if (!snapshotDir.mkdirs()) { + throw new IOException("Failed to create directory " + configuredSnapshotDir + + " for transaction snapshot storage"); + } + } else { + Preconditions.checkState(snapshotDir.isDirectory(), + "Configured snapshot directory " + configuredSnapshotDir + " is not a directory!"); + Preconditions.checkState(snapshotDir.canWrite(), "Configured snapshot directory " + + configuredSnapshotDir + " exists but is not writable!"); + } + } + + @Override + public List<String> listLogs() throws IOException { + File[] logs = snapshotDir.listFiles(new LogFileFilter(0, Long.MAX_VALUE)); + return Lists.transform(Arrays.asList(logs), new Function<File, String>() { + @Nullable + @Override + public String apply(@Nullable File input) { + return input.getName(); + } + }); + } + + private static class LogFileFilter implements FilenameFilter { + private final long startTime; + private final long endTime; + + public LogFileFilter(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public boolean accept(File file, String s) { + if (s.startsWith(LOG_FILE_PREFIX)) { + String[] parts = s.split("\\."); + if (parts.length == 2) { + try { + long fileTime = Long.parseLong(parts[1]); + return fileTime >= startTime && fileTime < endTime; + } catch (NumberFormatException ignored) { + LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", s); + } + } + } + return false; + } + } + + /** + * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both + * snapshot and transaction log filenames. + */ + private static class TimestampedFilename implements Comparable<TimestampedFilename> { + private File file; + private String prefix; + private long timestamp; + + public TimestampedFilename(File file) { + this.file = file; + String[] parts = file.getName().split("\\."); + if (parts.length != 2) { + throw new IllegalArgumentException("Filename " + file.getName() + + " did not match the expected pattern prefix.timestamp"); + } + prefix = parts[0]; + timestamp = Long.parseLong(parts[1]); + } + + public File getFile() { + return file; + } + + public String getPrefix() { + return prefix; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public int compareTo(TimestampedFilename other) { + int res = prefix.compareTo(other.getPrefix()); + if (res == 0) { + res = Longs.compare(timestamp, other.getTimestamp()); + } + return res; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java new file mode 100644 index 0000000..12f2475 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Inject; +import org.apache.tephra.snapshot.SnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecProvider; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Minimal {@link TransactionStateStorage} implementation that does nothing, i.e. does not maintain any actual state. + */ +public class NoOpTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { + + private final SnapshotCodec codec; + + @Inject + public NoOpTransactionStateStorage(SnapshotCodecProvider codecProvider) { + codec = codecProvider; + } + + @Override + protected void startUp() throws Exception { + } + + @Override + protected void shutDown() throws Exception { + } + + @Override + public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { + codec.encode(out, snapshot); + } + + @Override + public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { + } + + @Override + public TransactionSnapshot getLatestSnapshot() throws IOException { + return null; + } + + @Override + public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { + return null; + } + + @Override + public long deleteOldSnapshots(int numberToKeep) throws IOException { + return 0; + } + + @Override + public List<String> listSnapshots() throws IOException { + return new ArrayList<>(0); + } + + @Override + public List<TransactionLog> getLogsSince(long timestamp) throws IOException { + return new ArrayList<>(0); + } + + @Override + public TransactionLog createLog(long timestamp) throws IOException { + return new NoOpTransactionLog(); + } + + @Override + public void deleteLogsOlderThan(long timestamp) throws IOException { + } + + @Override + public void setupStorage() throws IOException { + } + + @Override + public List<String> listLogs() throws IOException { + return new ArrayList<>(0); + } + + @Override + public String getLocation() { + return "no-op"; + } + + private static class NoOpTransactionLog implements TransactionLog { + private long timestamp = System.currentTimeMillis(); + + @Override + public String getName() { + return "no-op"; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public void append(TransactionEdit edit) throws IOException { + } + + @Override + public void append(List<TransactionEdit> edits) throws IOException { + } + + @Override + public void close() { + } + + @Override + public TransactionLogReader getReader() { + return new TransactionLogReader() { + @Override + public TransactionEdit next() { + return null; + } + + @Override + public TransactionEdit next(TransactionEdit reuse) { + return null; + } + + @Override + public void close() { + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java new file mode 100644 index 0000000..1d07e72 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.base.Objects; +import com.google.common.collect.Sets; +import org.apache.hadoop.io.Writable; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; + +/** + * Represents a transaction state change in the {@link TransactionLog}. + */ +public class TransactionEdit implements Writable { + + /** + * The possible state changes for a transaction. + */ + public enum State { + INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT + } + + private long writePointer; + + /** + * stores the value of visibility upper bound + * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()}) + * for edit of {@link State#INPROGRESS} only + */ + private long visibilityUpperBound; + private long commitPointer; + private long expirationDate; + private State state; + private Set<ChangeId> changes; + /** Whether or not the COMMITTED change should be fully committed. */ + private boolean canCommit; + private TransactionType type; + private Set<Long> truncateInvalidTx; + private long truncateInvalidTxTime; + private long parentWritePointer; + private long[] checkpointPointers; + + // for Writable + public TransactionEdit() { + this.changes = Sets.newHashSet(); + this.truncateInvalidTx = Sets.newHashSet(); + } + + // package private for testing + TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate, + Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type, + Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer, + long[] checkpointPointers) { + this.writePointer = writePointer; + this.visibilityUpperBound = visibilityUpperBound; + this.state = state; + this.expirationDate = expirationDate; + this.changes = changes != null ? changes : Collections.<ChangeId>emptySet(); + this.commitPointer = commitPointer; + this.canCommit = canCommit; + this.type = type; + this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet(); + this.truncateInvalidTxTime = truncateInvalidTxTime; + this.parentWritePointer = parentWritePointer; + this.checkpointPointers = checkpointPointers; + } + + /** + * Returns the transaction write pointer assigned for the state change. + */ + public long getWritePointer() { + return writePointer; + } + + void setWritePointer(long writePointer) { + this.writePointer = writePointer; + } + + public long getVisibilityUpperBound() { + return visibilityUpperBound; + } + + void setVisibilityUpperBound(long visibilityUpperBound) { + this.visibilityUpperBound = visibilityUpperBound; + } + + /** + * Returns the type of state change represented. + */ + public State getState() { + return state; + } + + void setState(State state) { + this.state = state; + } + + /** + * Returns any expiration timestamp (in milliseconds) associated with the state change. This should only + * be populated for changes of type {@link State#INPROGRESS}. + */ + public long getExpiration() { + return expirationDate; + } + + void setExpiration(long expirationDate) { + this.expirationDate = expirationDate; + } + + /** + * @return the set of changed row keys associated with the state change. This is only populated for edits + * of type {@link State#COMMITTING} or {@link State#COMMITTED}. + */ + public Set<ChangeId> getChanges() { + return changes; + } + + void setChanges(Set<ChangeId> changes) { + this.changes = changes; + } + + /** + * Returns the write pointer used to commit the row key change set. This is only populated for edits of type + * {@link State#COMMITTED}. + */ + public long getCommitPointer() { + return commitPointer; + } + + void setCommitPointer(long commitPointer) { + this.commitPointer = commitPointer; + } + + /** + * Returns whether or not the transaction should be moved to the committed set. This is only populated for edits + * of type {@link State#COMMITTED}. + */ + public boolean getCanCommit() { + return canCommit; + } + + void setCanCommit(boolean canCommit) { + this.canCommit = canCommit; + } + + /** + * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or + * {@link State#ABORTED}. + */ + public TransactionType getType() { + return type; + } + + void setType(TransactionType type) { + this.type = type; + } + + /** + * Returns the transaction ids to be removed from invalid transaction list. This is only populated for + * edits of type {@link State#TRUNCATE_INVALID_TX} + */ + public Set<Long> getTruncateInvalidTx() { + return truncateInvalidTx; + } + + void setTruncateInvalidTx(Set<Long> truncateInvalidTx) { + this.truncateInvalidTx = truncateInvalidTx; + } + + /** + * Returns the time until which the invalid transactions need to be truncated from invalid transaction list. + * This is only populated for edits of type {@link State#TRUNCATE_INVALID_TX} + */ + public long getTruncateInvalidTxTime() { + return truncateInvalidTxTime; + } + + void setTruncateInvalidTxTime(long truncateInvalidTxTime) { + this.truncateInvalidTxTime = truncateInvalidTxTime; + } + + /** + * Returns the parent write pointer for a checkpoint operation. This is only populated for edits of type + * {@link State#CHECKPOINT} + */ + public long getParentWritePointer() { + return parentWritePointer; + } + + void setParentWritePointer(long parentWritePointer) { + this.parentWritePointer = parentWritePointer; + } + + /** + * Returns the checkpoint write pointers for the edit. This is only populated for edits of type + * {@link State#ABORTED}. + */ + public long[] getCheckpointPointers() { + return checkpointPointers; + } + + void setCheckpointPointers(long[] checkpointPointers) { + this.checkpointPointers = checkpointPointers; + } + + /** + * Creates a new instance in the {@link State#INPROGRESS} state. + */ + public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound, + long expirationDate, TransactionType type) { + return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS, + expirationDate, null, 0L, false, type, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#COMMITTING} state. + */ + public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) { + return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#COMMITTED} state. + */ + public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer, + boolean canCommit) { + return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null, + null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#ABORTED} state. + */ + public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) { + return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L, + checkpointPointers); + } + + /** + * Creates a new instance in the {@link State#INVALID} state. + */ + public static TransactionEdit createInvalid(long writePointer) { + return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#MOVE_WATERMARK} state. + */ + public static TransactionEdit createMoveWatermark(long writePointer) { + return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state. + */ + public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) { + return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx, + 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state. + */ + public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) { + return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null, + truncateInvalidTxTime, 0L, null); + } + + /** + * Creates a new instance in the {@link State#CHECKPOINT} state. + */ + public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) { + return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L, + parentWritePointer, null); + } + + @Override + public void write(DataOutput out) throws IOException { + TransactionEditCodecs.encode(this, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + TransactionEditCodecs.decode(this, in); + } + + @Override + public final boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TransactionEdit)) { + return false; + } + + TransactionEdit that = (TransactionEdit) o; + + return Objects.equal(this.writePointer, that.writePointer) && + Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) && + Objects.equal(this.commitPointer, that.commitPointer) && + Objects.equal(this.expirationDate, that.expirationDate) && + Objects.equal(this.state, that.state) && + Objects.equal(this.changes, that.changes) && + Objects.equal(this.canCommit, that.canCommit) && + Objects.equal(this.type, that.type) && + Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) && + Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) && + Objects.equal(this.parentWritePointer, that.parentWritePointer) && + Arrays.equals(this.checkpointPointers, that.checkpointPointers); + } + + @Override + public final int hashCode() { + return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes, + canCommit, type, parentWritePointer, checkpointPointers); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("writePointer", writePointer) + .add("visibilityUpperBound", visibilityUpperBound) + .add("commitPointer", commitPointer) + .add("expiration", expirationDate) + .add("state", state) + .add("changesSize", changes != null ? changes.size() : 0) + .add("canCommit", canCommit) + .add("type", type) + .add("truncateInvalidTx", truncateInvalidTx) + .add("truncateInvalidTxTime", truncateInvalidTxTime) + .add("parentWritePointer", parentWritePointer) + .add("checkpointPointers", checkpointPointers) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java new file mode 100644 index 0000000..387ad41 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionType; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility + * with older versions of the serialized data. + */ +public class TransactionEditCodecs { + + private static final TransactionEditCodec[] ALL_CODECS = { + new TransactionEditCodecV1(), + new TransactionEditCodecV2(), + new TransactionEditCodecV3(), + new TransactionEditCodecV4() + }; + + private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>(); + static { + for (TransactionEditCodec codec : ALL_CODECS) { + CODECS.put(codec.getVersion(), codec); + } + } + + /** + * Deserializes the encoded data from the given input stream, setting the values as fields + * on the given {@code TransactionEdit} instances. This method expects first value in the + * {code DataInput} to be a byte representing the codec version used to serialize the instance. + * + * @param dest the transaction edit to populate with the deserialized values + * @param in the input stream containing the encoded data + * @throws IOException if an error occurs while deserializing from the input stream + */ + public static void decode(TransactionEdit dest, DataInput in) throws IOException { + byte version = in.readByte(); + TransactionEditCodec codec = CODECS.get(version); + if (codec == null) { + throw new IOException("TransactionEdit was serialized with an unknown codec version " + version + + ". Was it written with a newer version of Tephra?"); + } + codec.decode(dest, in); + } + + /** + * Serializes the given {@code TransactionEdit} instance with the latest available codec. + * This will first write out the version of the codec used to serialize the instance so that + * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}. + * + * @param src the transaction edit to serialize + * @param out the output stream to contain the data + * @throws IOException if an error occurs while serializing to the output stream + */ + public static void encode(TransactionEdit src, DataOutput out) throws IOException { + TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey()); + out.writeByte(latestCodec.getVersion()); + latestCodec.encode(src, out); + } + + /** + * Encodes the given transaction edit using a specific codec. Note that this is only exposed + * for use by tests. + */ + @VisibleForTesting + static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException { + out.writeByte(codec.getVersion()); + codec.encode(src, out); + } + + /** + * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from + * binary representations. + */ + interface TransactionEditCodec { + /** + * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit} + * instance. + * + * @param dest the instance on which to set all the deserialized values + * @param in the input stream containing the serialized data + * @throws IOException if an error occurs while deserializing the data + */ + void decode(TransactionEdit dest, DataInput in) throws IOException; + + /** + * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data + * output stream. + * + * @param src the instance to serialize to the stream + * @param out the output stream to contain the data + * @throws IOException if an error occurs while serializing the data + */ + void encode(TransactionEdit src, DataOutput out) throws IOException; + + /** + * Returns the version number for this codec. Each codec should use a unique version number, with the newest + * codec having the lowest number. + */ + byte getVersion(); + } + + + // package-private for unit-test access + static class TransactionEditCodecV1 implements TransactionEditCodec { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + dest.setWritePointer(in.readLong()); + int stateIdx = in.readInt(); + try { + dest.setState(TransactionEdit.State.values()[stateIdx]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("State enum ordinal value is out of range: " + stateIdx); + } + dest.setExpiration(in.readLong()); + dest.setCommitPointer(in.readLong()); + dest.setCanCommit(in.readBoolean()); + int changeSize = in.readInt(); + Set<ChangeId> changes = Sets.newHashSet(); + for (int i = 0; i < changeSize; i++) { + int currentLength = in.readInt(); + byte[] currentBytes = new byte[currentLength]; + in.readFully(currentBytes); + changes.add(new ChangeId(currentBytes)); + } + dest.setChanges(changes); + // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until + // this tx is finished, but correctness will be preserved. + dest.setVisibilityUpperBound(0); + } + + /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for + * unit-tests only */ + @Override + @Deprecated + public void encode(TransactionEdit src, DataOutput out) throws IOException { + out.writeLong(src.getWritePointer()); + // use ordinal for predictable size, though this does not support evolution + out.writeInt(src.getState().ordinal()); + out.writeLong(src.getExpiration()); + out.writeLong(src.getCommitPointer()); + out.writeBoolean(src.getCanCommit()); + Set<ChangeId> changes = src.getChanges(); + if (changes == null) { + out.writeInt(0); + } else { + out.writeInt(changes.size()); + for (ChangeId c : changes) { + byte[] cKey = c.getKey(); + out.writeInt(cKey.length); + out.write(cKey); + } + } + // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2 + // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2, + // it was added in V3 + } + + @Override + public byte getVersion() { + return -1; + } + } + + // package-private for unit-test access + static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + super.decode(dest, in); + dest.setVisibilityUpperBound(in.readLong()); + } + + /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for + * unit-tests only */ + @Override + public void encode(TransactionEdit src, DataOutput out) throws IOException { + super.encode(src, out); + out.writeLong(src.getVisibilityUpperBound()); + // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2, + // it was added in V3 + } + + @Override + public byte getVersion() { + return -2; + } + } + + // TODO: refactor to avoid duplicate code among different version of codecs + // package-private for unit-test access + static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + super.decode(dest, in); + int typeIdx = in.readInt(); + // null transaction type is represented as -1 + if (typeIdx < 0) { + dest.setType(null); + } else { + try { + dest.setType(TransactionType.values()[typeIdx]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Type enum ordinal value is out of range: " + typeIdx); + } + } + + int truncateInvalidTxSize = in.readInt(); + Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx()); + for (int i = 0; i < truncateInvalidTxSize; i++) { + truncateInvalidTx.add(in.readLong()); + } + dest.setTruncateInvalidTx(truncateInvalidTx); + dest.setTruncateInvalidTxTime(in.readLong()); + } + + private <T> Set<T> emptySet(Set<T> set) { + if (set == null) { + return Sets.newHashSet(); + } + set.clear(); + return set; + } + + @Override + public void encode(TransactionEdit src, DataOutput out) throws IOException { + super.encode(src, out); + // null transaction type is represented as -1 + if (src.getType() == null) { + out.writeInt(-1); + } else { + out.writeInt(src.getType().ordinal()); + } + + Set<Long> truncateInvalidTx = src.getTruncateInvalidTx(); + if (truncateInvalidTx == null) { + out.writeInt(0); + } else { + out.writeInt(truncateInvalidTx.size()); + for (long id : truncateInvalidTx) { + out.writeLong(id); + } + } + out.writeLong(src.getTruncateInvalidTxTime()); + } + + @Override + public byte getVersion() { + return -3; + } + } + + static class TransactionEditCodecV4 extends TransactionEditCodecV3 { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + super.decode(dest, in); + dest.setParentWritePointer(in.readLong()); + int checkpointPointersLen = in.readInt(); + if (checkpointPointersLen >= 0) { + long[] checkpointPointers = new long[checkpointPointersLen]; + for (int i = 0; i < checkpointPointersLen; i++) { + checkpointPointers[i] = in.readLong(); + } + dest.setCheckpointPointers(checkpointPointers); + } + } + + @Override + public void encode(TransactionEdit src, DataOutput out) throws IOException { + super.encode(src, out); + out.writeLong(src.getParentWritePointer()); + long[] checkpointPointers = src.getCheckpointPointers(); + if (checkpointPointers == null) { + out.writeInt(-1); + } else { + out.writeInt(checkpointPointers.length); + for (int i = 0; i < checkpointPointers.length; i++) { + out.writeLong(checkpointPointers[i]); + } + } + } + + @Override + public byte getVersion() { + return -4; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java new file mode 100644 index 0000000..e453523 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import java.io.IOException; +import java.util.List; + +/** + * Represents a log of transaction state changes. + */ +public interface TransactionLog { + + String getName(); + + long getTimestamp(); + + void append(TransactionEdit edit) throws IOException; + + void append(List<TransactionEdit> edits) throws IOException; + + void close() throws IOException; + + TransactionLogReader getReader() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java new file mode 100644 index 0000000..51fda0b --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Represents a reader for {@link TransactionLog} instances. + */ +public interface TransactionLogReader extends Closeable { + /** + * Returns the next {@code TransactionEdit} from the log file, based on the current position, or {@code null} + * if the end of the file has been reached. + */ + TransactionEdit next() throws IOException; + + /** + * Populates {@code reuse} with the next {@code TransactionEdit}, based on the reader's current position in the + * log file. + * @param reuse The {@code TransactionEdit} instance to populate with the log entry data. + * @return The {@code TransactionEdit} instance, or {@code null} if the end of the file has been reached. + * @throws IOException If an error is encountered reading the log data. + */ + TransactionEdit next(TransactionEdit reuse) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java new file mode 100644 index 0000000..14893ac --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Common interface for transaction log writers used by classes extending {@link AbstractTransactionLog}. + */ +public interface TransactionLogWriter extends Closeable { + /** + * Adds a new transaction entry to the log. Note that this does not guarantee that the entry has been flushed + * to persistent storage until {@link #sync()} has been called. + * + * @param entry The transaction edit to append. + * @throws IOException If an error occurs while writing the edit to storage. + */ + void append(AbstractTransactionLog.Entry entry) throws IOException; + + /** + * Makes an entry of number of transaction entries that will follow in that log in a single sync. + * + * @param count Number of transaction entries. + * @throws IOException If an error occurs while writing the count to storage. + */ + void commitMarker(int count) throws IOException; + + /** + * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)}, + * but not yet flushed to durable storage. + * + * @throws IOException If an error occurs while flushing the outstanding edits. + */ + void sync() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java new file mode 100644 index 0000000..ccf7374 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionManager; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; + +/** + * Represents an in-memory snapshot of the full transaction state. + */ +public class TransactionSnapshot implements TransactionVisibilityState { + private long timestamp; + private long readPointer; + private long writePointer; + private Collection<Long> invalid; + private NavigableMap<Long, TransactionManager.InProgressTx> inProgress; + private Map<Long, Set<ChangeId>> committingChangeSets; + private Map<Long, Set<ChangeId>> committedChangeSets; + + public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid, + NavigableMap<Long, TransactionManager.InProgressTx> inProgress, + Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) { + this(timestamp, readPointer, writePointer, invalid, inProgress); + this.committingChangeSets = committing; + this.committedChangeSets = committed; + } + + public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid, + NavigableMap<Long, TransactionManager.InProgressTx> inProgress) { + this.timestamp = timestamp; + this.readPointer = readPointer; + this.writePointer = writePointer; + this.invalid = invalid; + this.inProgress = inProgress; + this.committingChangeSets = Collections.emptyMap(); + this.committedChangeSets = Collections.emptyMap(); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public long getReadPointer() { + return readPointer; + } + + @Override + public long getWritePointer() { + return writePointer; + } + + @Override + public Collection<Long> getInvalid() { + return invalid; + } + + @Override + public NavigableMap<Long, TransactionManager.InProgressTx> getInProgress() { + return inProgress; + } + + @Override + public long getVisibilityUpperBound() { + // the readPointer of the oldest in-progress tx is the oldest in use + // todo: potential problem with not moving visibility upper bound for the whole duration of long-running tx + Map.Entry<Long, TransactionManager.InProgressTx> firstInProgress = inProgress.firstEntry(); + if (firstInProgress == null) { + // using readPointer as smallest visible when non txs are there + return readPointer; + } + return firstInProgress.getValue().getVisibilityUpperBound(); + } + + /** + * Returns a map of transaction write pointer to sets of changed row keys for transactions that had called + * {@code InMemoryTransactionManager.canCommit(Transaction, Collection)} but not yet called + * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot. + * + * @return a map of transaction write pointer to set of changed row keys. + */ + public Map<Long, Set<ChangeId>> getCommittingChangeSets() { + return committingChangeSets; + } + + /** + * Returns a map of transaction write pointer to set of changed row keys for transaction that had successfully called + * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot. + * + * @return a map of transaction write pointer to set of changed row keys. + */ + public Map<Long, Set<ChangeId>> getCommittedChangeSets() { + return committedChangeSets; + } + + /** + * Checks that this instance matches another {@code TransactionSnapshot} instance. Note that the equality check + * ignores the snapshot timestamp value, but includes all other properties. + * + * @param obj the other instance to check for equality. + * @return {@code true} if the instances are equal, {@code false} if not. + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TransactionSnapshot)) { + return false; + } + TransactionSnapshot other = (TransactionSnapshot) obj; + return readPointer == other.readPointer && + writePointer == other.writePointer && + invalid.equals(other.invalid) && + inProgress.equals(other.inProgress) && + committingChangeSets.equals(other.committingChangeSets) && + committedChangeSets.equals(other.committedChangeSets); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("timestamp", timestamp) + .add("readPointer", readPointer) + .add("writePointer", writePointer) + .add("invalidSize", invalid.size()) + .add("inProgressSize", inProgress.size()) + .add("committingSize", committingChangeSets.size()) + .add("committedSize", committedChangeSets.size()) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode(readPointer, writePointer, invalid, inProgress, committingChangeSets, committedChangeSets); + } + + /** + * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections. + * @param readPointer current transaction read pointer + * @param writePointer current transaction write pointer + * @param invalid current list of invalid write pointers + * @param inProgress current map of in-progress write pointers to expiration timestamps + * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not + * yet committed + * @param committed current map of write pointers to change sets which have committed + * @return a new {@code TransactionSnapshot} instance + */ + public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer, + long writePointer, Collection<Long> invalid, + NavigableMap<Long, TransactionManager.InProgressTx> inProgress, + Map<Long, Set<ChangeId>> committing, + NavigableMap<Long, Set<ChangeId>> committed) { + // copy invalid IDs + Collection<Long> invalidCopy = Lists.newArrayList(invalid); + // copy in-progress IDs and expirations + NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress); + + // for committing and committed maps, we need to copy each individual Set as well to prevent modification + Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap(); + for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) { + committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue())); + } + + NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>(); + for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) { + committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue())); + } + + return new TransactionSnapshot(snapshotTime, readPointer, writePointer, + invalidCopy, inProgressCopy, committingCopy, committedCopy); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java new file mode 100644 index 0000000..f7edc8b --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.util.concurrent.Service; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +/** + * Defines the common contract for persisting transaction state changes. + */ +public interface TransactionStateStorage extends Service { + + /** + * Persists a snapshot of transaction state to an output stream. + */ + public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException; + + /** + * Persists a snapshot of transaction state. + */ + public void writeSnapshot(TransactionSnapshot snapshot) throws IOException; + + /** + * Returns the most recent snapshot that has been successfully written. Note that this may return {@code null} + * if no completed snapshot files are found. + */ + public TransactionSnapshot getLatestSnapshot() throws IOException; + + /** + * Returns the most recent transaction visibility state that has been successfully written. + * Note that this may return {@code null} if no completed snapshot files are found. + * @return {@link TransactionVisibilityState} + */ + public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException; + + /** + * Removes any snapshots prior to the {@code numberToKeep} most recent. + * + * @param numberToKeep The number of most recent snapshots to keep. + * @throws IOException If an error occurs while deleting old snapshots. + * @return The timestamp of the oldest snapshot kept. + */ + public long deleteOldSnapshots(int numberToKeep) throws IOException; + + /** + * Returns the (non-qualified) names of available snapshots. + */ + public List<String> listSnapshots() throws IOException; + + /** + * Returns all {@link TransactionLog}s with a timestamp greater than or equal to the given timestamp. Note that + * the returned list is guaranteed to be sorted in ascending timestamp order. + */ + public List<TransactionLog> getLogsSince(long timestamp) throws IOException; + + /** + * Creates a new {@link TransactionLog}. + */ + public TransactionLog createLog(long timestamp) throws IOException; + + /** + * Returns the (non-qualified) names of available logs. + */ + public List<String> listLogs() throws IOException; + + /** + * Removes any transaction logs with a timestamp older than the given value. Logs must be removed based on timestamp + * to ensure we can fully recover state based on a given snapshot. + * @param timestamp The timestamp to delete up to. Logs with a timestamp less than this value will be removed. + * @throws IOException If an error occurs while removing logs. + */ + public void deleteLogsOlderThan(long timestamp) throws IOException; + + /** + * Create the directories required for the transaction state stage. + * @throws IOException If an error occurred during the creation of required directories for transaction state storage. + */ + public void setupStorage() throws IOException; + + /** + * Returns a string representation of the location used for persistence. + */ + public String getLocation(); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java new file mode 100644 index 0000000..cf845af --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import org.apache.tephra.TransactionManager; + +import java.util.Collection; +import java.util.NavigableMap; + +/** + * Transaction Visibility state contains information required by TransactionProcessor CoProcessor + * to determine cell visibility. + */ +public interface TransactionVisibilityState { + + /** + * Returns the timestamp from when this snapshot was created. + */ + long getTimestamp(); + + /** + * Returns the read pointer at the time of the snapshot. + */ + long getReadPointer(); + + /** + * Returns the next write pointer at the time of the snapshot. + */ + long getWritePointer(); + + /** + * Returns the list of invalid write pointers at the time of the snapshot. + */ + Collection<Long> getInvalid(); + + /** + * Returns the map of write pointers to in-progress transactions at the time of the snapshot. + */ + NavigableMap<Long, TransactionManager.InProgressTx> getInProgress(); + + /** + * @return transaction id {@code X} such that any of the transactions newer than {@code X} might be invisible to + * some of the currently in-progress transactions or to those that will be started <p> + * NOTE: the returned tx id can be invalid. + */ + long getVisibilityUpperBound(); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java b/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java new file mode 100644 index 0000000..01decb0 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains interfaces and implementations for persisting transaction state. + */ +package org.apache.tephra.persist; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java b/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java new file mode 100644 index 0000000..242a6fe --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.rpc; + +/** + * Defines lifecycle interface for all rpc handlers. + */ +public interface RPCServiceHandler { + + void init() throws Exception; + + void destroy() throws Exception; +}
