http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java deleted file mode 100644 index a462566..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.persist; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.Method; - -/** - * Utility for handling HDFS file lease recovery. This is a copy-n-paste fork of - * {@link org.apache.hadoop.hbase.util.FSHDFSUtils} from the latest HBase 0.94 version (as of 0.94.12), - * which contains some additional fixes not present in our current HBase dependency version -- - * mainly checking the return value of the {@code DistributedFileSystem.recoverLease()} call to verify that - * recovery succeeded. - */ -public class HDFSUtil { - private static final Logger LOG = LoggerFactory.getLogger(HDFSUtil.class); - /** - * Recover the lease from HDFS, retrying multiple times. - */ - public void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf) - throws IOException { - // lease recovery not needed for local file system case. - if (!(fs instanceof DistributedFileSystem)) { - return; - } - recoverDFSFileLease((DistributedFileSystem) fs, p, conf); - } - - /* - * Run the dfs recover lease. recoverLease is asynchronous. It returns: - * -false when it starts the lease recovery (i.e. lease recovery not *yet* done) - * - true when the lease recovery has succeeded or the file is closed. - * But, we have to be careful. Each time we call recoverLease, it starts the recover lease - * process over from the beginning. We could put ourselves in a situation where we are - * doing nothing but starting a recovery, interrupting it to start again, and so on. - * The findings over in HBASE-8354 have it that the namenode will try to recover the lease - * on the file's primary node. If all is well, it should return near immediately. But, - * as is common, it is the very primary node that has crashed and so the namenode will be - * stuck waiting on a socket timeout before it will ask another datanode to start the - * recovery. It does not help if we call recoverLease in the meantime and in particular, - * subsequent to the socket timeout, a recoverLease invocation will cause us to start - * over from square one (possibly waiting on socket timeout against primary node). So, - * in the below, we do the following: - * 1. Call recoverLease. - * 2. If it returns true, break. - * 3. If it returns false, wait a few seconds and then call it again. - * 4. If it returns true, break. - * 5. If it returns false, wait for what we think the datanode socket timeout is - * (configurable) and then try again. - * 6. If it returns true, break. - * 7. If it returns false, repeat starting at step 5. above. - * - * If HDFS-4525 is available, call it every second and we might be able to exit early. - */ - boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, - final Configuration conf) - throws IOException { - LOG.info("Recovering lease on dfs file " + p); - long startWaiting = System.currentTimeMillis(); - // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS - // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves - // beyond that limit 'to be safe'. - long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; - // This setting should be what the cluster dfs heartbeat is set to. - long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000); - // This should be set to how long it'll take for us to timeout against primary datanode if it - // is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the - // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. - long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000); - - Method isFileClosedMeth = null; - // whether we need to look for isFileClosed method - boolean findIsFileClosedMeth = true; - boolean recovered = false; - // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. - for (int nbAttempt = 0; !recovered; nbAttempt++) { - recovered = recoverLease(dfs, nbAttempt, p, startWaiting); - if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { - break; - } - try { - // On the first time through wait the short 'firstPause'. - if (nbAttempt == 0) { - Thread.sleep(firstPause); - } else { - // Cycle here until subsequentPause elapses. While spinning, check isFileClosed if - // available (should be in hadoop 2.0.5... not in hadoop 1 though. - long localStartWaiting = System.currentTimeMillis(); - while ((System.currentTimeMillis() - localStartWaiting) < - subsequentPause) { - Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); - if (findIsFileClosedMeth) { - try { - isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", - new Class[]{ Path.class }); - } catch (NoSuchMethodException nsme) { - LOG.debug("isFileClosed not available"); - } finally { - findIsFileClosedMeth = false; - } - } - if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { - recovered = true; - break; - } - } - } - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } - return recovered; - } - - boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, - final int nbAttempt, final Path p, final long startWaiting) { - if (recoveryTimeout < System.currentTimeMillis()) { - LOG.warn("Cannot recoverLease after trying for " + - conf.getInt("hbase.lease.recovery.timeout", 900000) + - "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + - getLogMessageDetail(nbAttempt, p, startWaiting)); - return true; - } - return false; - } - - /** - * Try to recover the lease. - * @param dfs The filesystem instance. - * @param nbAttempt Count number of this attempt. - * @param p Path of the file to recover. - * @param startWaiting Timestamp of when we started attempting to recover the file lease. - * @return True if dfs#recoverLease came by true. - * @throws java.io.FileNotFoundException - */ - boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, - final long startWaiting) - throws FileNotFoundException { - boolean recovered = false; - try { - recovered = dfs.recoverLease(p); - LOG.info("recoverLease=" + recovered + ", " + - getLogMessageDetail(nbAttempt, p, startWaiting)); - } catch (IOException e) { - if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { - // This exception comes out instead of FNFE, fix it - throw new FileNotFoundException("The given file wasn't found at " + p); - } else if (e instanceof FileNotFoundException) { - throw (FileNotFoundException) e; - } - LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); - } - return recovered; - } - - /** - * @param nbAttempt Attempt number for the lease recovery. - * @param p Path of the file to recover. - * @param startWaiting Timestamp of when we started attempting to recover the file lease. - * @return Detail to append to any log message around lease recovering. - */ - private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) { - return "attempt=" + nbAttempt + " on file=" + p + " after " + - (System.currentTimeMillis() - startWaiting) + "ms"; - } - - /** - * Call HDFS-4525 isFileClosed if it is available. - * @param dfs Filesystem instance to use. - * @param m Method instance to call. - * @param p Path of the file to check is closed. - * @return True if file is closed. - */ - private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) { - try { - return (Boolean) m.invoke(dfs, p); - } catch (SecurityException e) { - LOG.warn("No access", e); - } catch (Exception e) { - LOG.warn("Failed invocation for " + p.toString(), e); - } - return false; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java deleted file mode 100644 index 590aaa5..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.metrics.MetricsCollector; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; - -/** - * Reads and writes transaction logs against files in the local filesystem. - */ -public class LocalFileTransactionLog extends AbstractTransactionLog { - private final File logFile; - - /** - * Creates a new transaction log using the given file instance. - * @param logFile The log file to use. - */ - public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) { - super(timestamp, metricsCollector); - this.logFile = logFile; - } - - @Override - public String getName() { - return logFile.getAbsolutePath(); - } - - @Override - protected TransactionLogWriter createWriter() throws IOException { - return new LogWriter(logFile); - } - - @Override - public TransactionLogReader getReader() throws IOException { - return new LogReader(logFile); - } - - private static final class LogWriter implements TransactionLogWriter { - private final FileOutputStream fos; - private final DataOutputStream out; - - public LogWriter(File logFile) throws IOException { - this.fos = new FileOutputStream(logFile); - this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE)); - } - - @Override - public void append(Entry entry) throws IOException { - entry.write(out); - } - - @Override - public void commitMarker(int count) throws IOException { - // skip for local file - } - - @Override - public void sync() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - out.flush(); - out.close(); - fos.close(); - } - } - - private static final class LogReader implements TransactionLogReader { - private final FileInputStream fin; - private final DataInputStream in; - private Entry reuseEntry = new Entry(); - - public LogReader(File logFile) throws IOException { - this.fin = new FileInputStream(logFile); - this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE)); - } - - @Override - public TransactionEdit next() throws IOException { - Entry entry = new Entry(); - try { - entry.readFields(in); - } catch (EOFException eofe) { - // signal end of file by returning null - return null; - } - return entry.getEdit(); - } - - @Override - public TransactionEdit next(TransactionEdit reuse) throws IOException { - try { - reuseEntry.getKey().readFields(in); - reuse.readFields(in); - } catch (EOFException eofe) { - // signal end of file by returning null - return null; - } - return reuse; - } - - @Override - public void close() throws IOException { - in.close(); - fin.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java deleted file mode 100644 index 4c3539d..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; -import com.google.common.io.Files; -import com.google.common.primitives.Longs; -import com.google.inject.Inject; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nullable; - -/** - * Persists transaction snapshots and write-ahead logs to files on the local filesystem. - */ -public class LocalFileTransactionStateStorage extends AbstractTransactionStateStorage { - private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress."; - private static final String SNAPSHOT_FILE_PREFIX = "snapshot."; - private static final String LOG_FILE_PREFIX = "txlog."; - private static final Logger LOG = LoggerFactory.getLogger(LocalFileTransactionStateStorage.class); - static final int BUFFER_SIZE = 16384; - - private static final FilenameFilter SNAPSHOT_FILE_FILTER = new FilenameFilter() { - @Override - public boolean accept(File file, String s) { - return s.startsWith(SNAPSHOT_FILE_PREFIX); - } - }; - - private final String configuredSnapshotDir; - private final MetricsCollector metricsCollector; - private File snapshotDir; - - @Inject - public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider, - MetricsCollector metricsCollector) { - super(codecProvider); - this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR); - this.metricsCollector = metricsCollector; - } - - @Override - protected void startUp() throws Exception { - Preconditions.checkState(configuredSnapshotDir != null, - "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR + - " in configuration."); - snapshotDir = new File(configuredSnapshotDir); - } - - @Override - protected void shutDown() throws Exception { - // nothing to do - } - - @Override - public String getLocation() { - return snapshotDir.getAbsolutePath(); - } - - @Override - public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { - // save the snapshot to a temporary file - File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); - LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile); - OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput(); - boolean threw = true; - try { - codecProvider.encode(out, snapshot); - threw = false; - } finally { - Closeables.close(out, threw); - } - - // move the temporary file into place with the correct filename - File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); - if (!snapshotTmpFile.renameTo(finalFile)) { - throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " + - finalFile.getName()); - } - - LOG.debug("Completed snapshot to file {}", finalFile); - } - - @Override - public TransactionSnapshot getLatestSnapshot() throws IOException { - InputStream is = getLatestSnapshotInputStream(); - if (is == null) { - return null; - } - try { - return readSnapshotFile(is); - } finally { - is.close(); - } - } - - @Override - public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { - InputStream is = getLatestSnapshotInputStream(); - if (is == null) { - return null; - } - try { - return codecProvider.decodeTransactionVisibilityState(is); - } finally { - is.close(); - } - } - - private InputStream getLatestSnapshotInputStream() throws IOException { - File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER); - TimestampedFilename mostRecent = null; - for (File file : snapshotFiles) { - TimestampedFilename tsFile = new TimestampedFilename(file); - if (mostRecent == null || tsFile.compareTo(mostRecent) > 0) { - mostRecent = tsFile; - } - } - - if (mostRecent == null) { - LOG.info("No snapshot files found in {}", snapshotDir.getAbsolutePath()); - return null; - } - - return new FileInputStream(mostRecent.getFile()); - } - - private TransactionSnapshot readSnapshotFile(InputStream is) throws IOException { - return codecProvider.decode(is); - } - - @Override - public long deleteOldSnapshots(int numberToKeep) throws IOException { - File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER); - if (snapshotFiles.length == 0) { - return -1; - } - TimestampedFilename[] snapshotFilenames = new TimestampedFilename[snapshotFiles.length]; - for (int i = 0; i < snapshotFiles.length; i++) { - snapshotFilenames[i] = new TimestampedFilename(snapshotFiles[i]); - } - Arrays.sort(snapshotFilenames, Collections.reverseOrder()); - if (snapshotFilenames.length <= numberToKeep) { - // nothing to delete, just return the oldest timestamp - return snapshotFilenames[snapshotFilenames.length - 1].getTimestamp(); - } - int toRemoveCount = snapshotFilenames.length - numberToKeep; - TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount]; - System.arraycopy(snapshotFilenames, numberToKeep, toRemove, 0, toRemoveCount); - int removedCnt = 0; - for (int i = 0; i < toRemove.length; i++) { - File currentFile = toRemove[i].getFile(); - LOG.debug("Removing old snapshot file {}", currentFile.getAbsolutePath()); - if (!toRemove[i].getFile().delete()) { - LOG.error("Failed deleting snapshot file {}", currentFile.getAbsolutePath()); - } else { - removedCnt++; - } - } - long oldestTimestamp = snapshotFilenames[numberToKeep - 1].getTimestamp(); - LOG.info("Removed {} out of {} expected snapshot files older than {}", removedCnt, toRemoveCount, oldestTimestamp); - return oldestTimestamp; - } - - @Override - public List<String> listSnapshots() throws IOException { - File[] snapshots = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER); - return Lists.transform(Arrays.asList(snapshots), new Function<File, String>() { - @Nullable - @Override - public String apply(@Nullable File input) { - return input.getName(); - } - }); - } - - @Override - public List<TransactionLog> getLogsSince(long timestamp) throws IOException { - File[] logFiles = snapshotDir.listFiles(new LogFileFilter(timestamp, Long.MAX_VALUE)); - TimestampedFilename[] timestampedFiles = new TimestampedFilename[logFiles.length]; - for (int i = 0; i < logFiles.length; i++) { - timestampedFiles[i] = new TimestampedFilename(logFiles[i]); - } - // logs need to be processed in ascending order - Arrays.sort(timestampedFiles); - return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() { - @Nullable - @Override - public TransactionLog apply(@Nullable TimestampedFilename input) { - return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector); - } - }); - } - - @Override - public TransactionLog createLog(long timestamp) throws IOException { - File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp); - LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath()); - return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector); - } - - @Override - public void deleteLogsOlderThan(long timestamp) throws IOException { - File[] logFiles = snapshotDir.listFiles(new LogFileFilter(0, timestamp)); - int removedCnt = 0; - for (File file : logFiles) { - LOG.debug("Removing old transaction log {}", file.getPath()); - if (file.delete()) { - removedCnt++; - } else { - LOG.warn("Failed to remove log file {}", file.getAbsolutePath()); - } - } - LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp); - } - - @Override - public void setupStorage() throws IOException { - // create the directory if it doesn't exist - if (!snapshotDir.exists()) { - if (!snapshotDir.mkdirs()) { - throw new IOException("Failed to create directory " + configuredSnapshotDir + - " for transaction snapshot storage"); - } - } else { - Preconditions.checkState(snapshotDir.isDirectory(), - "Configured snapshot directory " + configuredSnapshotDir + " is not a directory!"); - Preconditions.checkState(snapshotDir.canWrite(), "Configured snapshot directory " + - configuredSnapshotDir + " exists but is not writable!"); - } - } - - @Override - public List<String> listLogs() throws IOException { - File[] logs = snapshotDir.listFiles(new LogFileFilter(0, Long.MAX_VALUE)); - return Lists.transform(Arrays.asList(logs), new Function<File, String>() { - @Nullable - @Override - public String apply(@Nullable File input) { - return input.getName(); - } - }); - } - - private static class LogFileFilter implements FilenameFilter { - private final long startTime; - private final long endTime; - - public LogFileFilter(long startTime, long endTime) { - this.startTime = startTime; - this.endTime = endTime; - } - - @Override - public boolean accept(File file, String s) { - if (s.startsWith(LOG_FILE_PREFIX)) { - String[] parts = s.split("\\."); - if (parts.length == 2) { - try { - long fileTime = Long.parseLong(parts[1]); - return fileTime >= startTime && fileTime < endTime; - } catch (NumberFormatException ignored) { - LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", s); - } - } - } - return false; - } - } - - /** - * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both - * snapshot and transaction log filenames. - */ - private static class TimestampedFilename implements Comparable<TimestampedFilename> { - private File file; - private String prefix; - private long timestamp; - - public TimestampedFilename(File file) { - this.file = file; - String[] parts = file.getName().split("\\."); - if (parts.length != 2) { - throw new IllegalArgumentException("Filename " + file.getName() + - " did not match the expected pattern prefix.timestamp"); - } - prefix = parts[0]; - timestamp = Long.parseLong(parts[1]); - } - - public File getFile() { - return file; - } - - public String getPrefix() { - return prefix; - } - - public long getTimestamp() { - return timestamp; - } - - @Override - public int compareTo(TimestampedFilename other) { - int res = prefix.compareTo(other.getPrefix()); - if (res == 0) { - res = Longs.compare(timestamp, other.getTimestamp()); - } - return res; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java deleted file mode 100644 index 3b6d0b7..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.snapshot.SnapshotCodec; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import com.google.common.util.concurrent.AbstractIdleService; -import com.google.inject.Inject; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -/** - * Minimal {@link TransactionStateStorage} implementation that does nothing, i.e. does not maintain any actual state. - */ -public class NoOpTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { - - private final SnapshotCodec codec; - - @Inject - public NoOpTransactionStateStorage(SnapshotCodecProvider codecProvider) { - codec = codecProvider; - } - - @Override - protected void startUp() throws Exception { - } - - @Override - protected void shutDown() throws Exception { - } - - @Override - public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { - codec.encode(out, snapshot); - } - - @Override - public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { - } - - @Override - public TransactionSnapshot getLatestSnapshot() throws IOException { - return null; - } - - @Override - public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { - return null; - } - - @Override - public long deleteOldSnapshots(int numberToKeep) throws IOException { - return 0; - } - - @Override - public List<String> listSnapshots() throws IOException { - return new ArrayList<>(0); - } - - @Override - public List<TransactionLog> getLogsSince(long timestamp) throws IOException { - return new ArrayList<>(0); - } - - @Override - public TransactionLog createLog(long timestamp) throws IOException { - return new NoOpTransactionLog(); - } - - @Override - public void deleteLogsOlderThan(long timestamp) throws IOException { - } - - @Override - public void setupStorage() throws IOException { - } - - @Override - public List<String> listLogs() throws IOException { - return new ArrayList<>(0); - } - - @Override - public String getLocation() { - return "no-op"; - } - - private static class NoOpTransactionLog implements TransactionLog { - private long timestamp = System.currentTimeMillis(); - - @Override - public String getName() { - return "no-op"; - } - - @Override - public long getTimestamp() { - return timestamp; - } - - @Override - public void append(TransactionEdit edit) throws IOException { - } - - @Override - public void append(List<TransactionEdit> edits) throws IOException { - } - - @Override - public void close() { - } - - @Override - public TransactionLogReader getReader() { - return new TransactionLogReader() { - @Override - public TransactionEdit next() { - return null; - } - - @Override - public TransactionEdit next(TransactionEdit reuse) { - return null; - } - - @Override - public void close() { - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java deleted file mode 100644 index 405bbfd..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionType; -import com.google.common.base.Objects; -import com.google.common.collect.Sets; -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Set; - -/** - * Represents a transaction state change in the {@link TransactionLog}. - */ -public class TransactionEdit implements Writable { - - /** - * The possible state changes for a transaction. - */ - public enum State { - INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT - } - - private long writePointer; - - /** - * stores the value of visibility upper bound - * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()}) - * for edit of {@link State#INPROGRESS} only - */ - private long visibilityUpperBound; - private long commitPointer; - private long expirationDate; - private State state; - private Set<ChangeId> changes; - /** Whether or not the COMMITTED change should be fully committed. */ - private boolean canCommit; - private TransactionType type; - private Set<Long> truncateInvalidTx; - private long truncateInvalidTxTime; - private long parentWritePointer; - private long[] checkpointPointers; - - // for Writable - public TransactionEdit() { - this.changes = Sets.newHashSet(); - this.truncateInvalidTx = Sets.newHashSet(); - } - - // package private for testing - TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate, - Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type, - Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer, - long[] checkpointPointers) { - this.writePointer = writePointer; - this.visibilityUpperBound = visibilityUpperBound; - this.state = state; - this.expirationDate = expirationDate; - this.changes = changes != null ? changes : Collections.<ChangeId>emptySet(); - this.commitPointer = commitPointer; - this.canCommit = canCommit; - this.type = type; - this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet(); - this.truncateInvalidTxTime = truncateInvalidTxTime; - this.parentWritePointer = parentWritePointer; - this.checkpointPointers = checkpointPointers; - } - - /** - * Returns the transaction write pointer assigned for the state change. - */ - public long getWritePointer() { - return writePointer; - } - - void setWritePointer(long writePointer) { - this.writePointer = writePointer; - } - - public long getVisibilityUpperBound() { - return visibilityUpperBound; - } - - void setVisibilityUpperBound(long visibilityUpperBound) { - this.visibilityUpperBound = visibilityUpperBound; - } - - /** - * Returns the type of state change represented. - */ - public State getState() { - return state; - } - - void setState(State state) { - this.state = state; - } - - /** - * Returns any expiration timestamp (in milliseconds) associated with the state change. This should only - * be populated for changes of type {@link State#INPROGRESS}. - */ - public long getExpiration() { - return expirationDate; - } - - void setExpiration(long expirationDate) { - this.expirationDate = expirationDate; - } - - /** - * @return the set of changed row keys associated with the state change. This is only populated for edits - * of type {@link State#COMMITTING} or {@link State#COMMITTED}. - */ - public Set<ChangeId> getChanges() { - return changes; - } - - void setChanges(Set<ChangeId> changes) { - this.changes = changes; - } - - /** - * Returns the write pointer used to commit the row key change set. This is only populated for edits of type - * {@link State#COMMITTED}. - */ - public long getCommitPointer() { - return commitPointer; - } - - void setCommitPointer(long commitPointer) { - this.commitPointer = commitPointer; - } - - /** - * Returns whether or not the transaction should be moved to the committed set. This is only populated for edits - * of type {@link State#COMMITTED}. - */ - public boolean getCanCommit() { - return canCommit; - } - - void setCanCommit(boolean canCommit) { - this.canCommit = canCommit; - } - - /** - * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or - * {@link State#ABORTED}. - */ - public TransactionType getType() { - return type; - } - - void setType(TransactionType type) { - this.type = type; - } - - /** - * Returns the transaction ids to be removed from invalid transaction list. This is only populated for - * edits of type {@link State#TRUNCATE_INVALID_TX} - */ - public Set<Long> getTruncateInvalidTx() { - return truncateInvalidTx; - } - - void setTruncateInvalidTx(Set<Long> truncateInvalidTx) { - this.truncateInvalidTx = truncateInvalidTx; - } - - /** - * Returns the time until which the invalid transactions need to be truncated from invalid transaction list. - * This is only populated for edits of type {@link State#TRUNCATE_INVALID_TX} - */ - public long getTruncateInvalidTxTime() { - return truncateInvalidTxTime; - } - - void setTruncateInvalidTxTime(long truncateInvalidTxTime) { - this.truncateInvalidTxTime = truncateInvalidTxTime; - } - - /** - * Returns the parent write pointer for a checkpoint operation. This is only populated for edits of type - * {@link State#CHECKPOINT} - */ - public long getParentWritePointer() { - return parentWritePointer; - } - - void setParentWritePointer(long parentWritePointer) { - this.parentWritePointer = parentWritePointer; - } - - /** - * Returns the checkpoint write pointers for the edit. This is only populated for edits of type - * {@link State#ABORTED}. - */ - public long[] getCheckpointPointers() { - return checkpointPointers; - } - - void setCheckpointPointers(long[] checkpointPointers) { - this.checkpointPointers = checkpointPointers; - } - - /** - * Creates a new instance in the {@link State#INPROGRESS} state. - */ - public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound, - long expirationDate, TransactionType type) { - return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS, - expirationDate, null, 0L, false, type, null, 0L, 0L, null); - } - - /** - * Creates a new instance in the {@link State#COMMITTING} state. - */ - public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) { - return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null); - } - - /** - * Creates a new instance in the {@link State#COMMITTED} state. - */ - public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer, - boolean canCommit) { - return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null, - null, 0L, 0L, null); - } - - /** - * Creates a new instance in the {@link State#ABORTED} state. - */ - public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) { - return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L, - checkpointPointers); - } - - /** - * Creates a new instance in the {@link State#INVALID} state. - */ - public static TransactionEdit createInvalid(long writePointer) { - return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null); - } - - /** - * Creates a new instance in the {@link State#MOVE_WATERMARK} state. - */ - public static TransactionEdit createMoveWatermark(long writePointer) { - return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null); - } - - /** - * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state. - */ - public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) { - return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx, - 0L, 0L, null); - } - - /** - * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state. - */ - public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) { - return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null, - truncateInvalidTxTime, 0L, null); - } - - /** - * Creates a new instance in the {@link State#CHECKPOINT} state. - */ - public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) { - return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L, - parentWritePointer, null); - } - - @Override - public void write(DataOutput out) throws IOException { - TransactionEditCodecs.encode(this, out); - } - - @Override - public void readFields(DataInput in) throws IOException { - TransactionEditCodecs.decode(this, in); - } - - @Override - public final boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof TransactionEdit)) { - return false; - } - - TransactionEdit that = (TransactionEdit) o; - - return Objects.equal(this.writePointer, that.writePointer) && - Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) && - Objects.equal(this.commitPointer, that.commitPointer) && - Objects.equal(this.expirationDate, that.expirationDate) && - Objects.equal(this.state, that.state) && - Objects.equal(this.changes, that.changes) && - Objects.equal(this.canCommit, that.canCommit) && - Objects.equal(this.type, that.type) && - Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) && - Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) && - Objects.equal(this.parentWritePointer, that.parentWritePointer) && - Arrays.equals(this.checkpointPointers, that.checkpointPointers); - } - - @Override - public final int hashCode() { - return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes, - canCommit, type, parentWritePointer, checkpointPointers); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("writePointer", writePointer) - .add("visibilityUpperBound", visibilityUpperBound) - .add("commitPointer", commitPointer) - .add("expiration", expirationDate) - .add("state", state) - .add("changesSize", changes != null ? changes.size() : 0) - .add("canCommit", canCommit) - .add("type", type) - .add("truncateInvalidTx", truncateInvalidTx) - .add("truncateInvalidTxTime", truncateInvalidTxTime) - .add("parentWritePointer", parentWritePointer) - .add("checkpointPointers", checkpointPointers) - .toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java deleted file mode 100644 index e18b73f..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionType; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; - -/** - * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility - * with older versions of the serialized data. - */ -public class TransactionEditCodecs { - - private static final TransactionEditCodec[] ALL_CODECS = { - new TransactionEditCodecV1(), - new TransactionEditCodecV2(), - new TransactionEditCodecV3(), - new TransactionEditCodecV4() - }; - - private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>(); - static { - for (TransactionEditCodec codec : ALL_CODECS) { - CODECS.put(codec.getVersion(), codec); - } - } - - /** - * Deserializes the encoded data from the given input stream, setting the values as fields - * on the given {@code TransactionEdit} instances. This method expects first value in the - * {code DataInput} to be a byte representing the codec version used to serialize the instance. - * - * @param dest the transaction edit to populate with the deserialized values - * @param in the input stream containing the encoded data - * @throws IOException if an error occurs while deserializing from the input stream - */ - public static void decode(TransactionEdit dest, DataInput in) throws IOException { - byte version = in.readByte(); - TransactionEditCodec codec = CODECS.get(version); - if (codec == null) { - throw new IOException("TransactionEdit was serialized with an unknown codec version " + version + - ". Was it written with a newer version of Tephra?"); - } - codec.decode(dest, in); - } - - /** - * Serializes the given {@code TransactionEdit} instance with the latest available codec. - * This will first write out the version of the codec used to serialize the instance so that - * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}. - * - * @param src the transaction edit to serialize - * @param out the output stream to contain the data - * @throws IOException if an error occurs while serializing to the output stream - */ - public static void encode(TransactionEdit src, DataOutput out) throws IOException { - TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey()); - out.writeByte(latestCodec.getVersion()); - latestCodec.encode(src, out); - } - - /** - * Encodes the given transaction edit using a specific codec. Note that this is only exposed - * for use by tests. - */ - @VisibleForTesting - static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException { - out.writeByte(codec.getVersion()); - codec.encode(src, out); - } - - /** - * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from - * binary representations. - */ - interface TransactionEditCodec { - /** - * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit} - * instance. - * - * @param dest the instance on which to set all the deserialized values - * @param in the input stream containing the serialized data - * @throws IOException if an error occurs while deserializing the data - */ - void decode(TransactionEdit dest, DataInput in) throws IOException; - - /** - * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data - * output stream. - * - * @param src the instance to serialize to the stream - * @param out the output stream to contain the data - * @throws IOException if an error occurs while serializing the data - */ - void encode(TransactionEdit src, DataOutput out) throws IOException; - - /** - * Returns the version number for this codec. Each codec should use a unique version number, with the newest - * codec having the lowest number. - */ - byte getVersion(); - } - - - // package-private for unit-test access - static class TransactionEditCodecV1 implements TransactionEditCodec { - @Override - public void decode(TransactionEdit dest, DataInput in) throws IOException { - dest.setWritePointer(in.readLong()); - int stateIdx = in.readInt(); - try { - dest.setState(TransactionEdit.State.values()[stateIdx]); - } catch (ArrayIndexOutOfBoundsException e) { - throw new IOException("State enum ordinal value is out of range: " + stateIdx); - } - dest.setExpiration(in.readLong()); - dest.setCommitPointer(in.readLong()); - dest.setCanCommit(in.readBoolean()); - int changeSize = in.readInt(); - Set<ChangeId> changes = Sets.newHashSet(); - for (int i = 0; i < changeSize; i++) { - int currentLength = in.readInt(); - byte[] currentBytes = new byte[currentLength]; - in.readFully(currentBytes); - changes.add(new ChangeId(currentBytes)); - } - dest.setChanges(changes); - // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until - // this tx is finished, but correctness will be preserved. - dest.setVisibilityUpperBound(0); - } - - /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for - * unit-tests only */ - @Override - @Deprecated - public void encode(TransactionEdit src, DataOutput out) throws IOException { - out.writeLong(src.getWritePointer()); - // use ordinal for predictable size, though this does not support evolution - out.writeInt(src.getState().ordinal()); - out.writeLong(src.getExpiration()); - out.writeLong(src.getCommitPointer()); - out.writeBoolean(src.getCanCommit()); - Set<ChangeId> changes = src.getChanges(); - if (changes == null) { - out.writeInt(0); - } else { - out.writeInt(changes.size()); - for (ChangeId c : changes) { - byte[] cKey = c.getKey(); - out.writeInt(cKey.length); - out.write(cKey); - } - } - // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2 - // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2, - // it was added in V3 - } - - @Override - public byte getVersion() { - return -1; - } - } - - // package-private for unit-test access - static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec { - @Override - public void decode(TransactionEdit dest, DataInput in) throws IOException { - super.decode(dest, in); - dest.setVisibilityUpperBound(in.readLong()); - } - - /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for - * unit-tests only */ - @Override - public void encode(TransactionEdit src, DataOutput out) throws IOException { - super.encode(src, out); - out.writeLong(src.getVisibilityUpperBound()); - // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2, - // it was added in V3 - } - - @Override - public byte getVersion() { - return -2; - } - } - - // TODO: refactor to avoid duplicate code among different version of codecs - // package-private for unit-test access - static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec { - @Override - public void decode(TransactionEdit dest, DataInput in) throws IOException { - super.decode(dest, in); - int typeIdx = in.readInt(); - // null transaction type is represented as -1 - if (typeIdx < 0) { - dest.setType(null); - } else { - try { - dest.setType(TransactionType.values()[typeIdx]); - } catch (ArrayIndexOutOfBoundsException e) { - throw new IOException("Type enum ordinal value is out of range: " + typeIdx); - } - } - - int truncateInvalidTxSize = in.readInt(); - Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx()); - for (int i = 0; i < truncateInvalidTxSize; i++) { - truncateInvalidTx.add(in.readLong()); - } - dest.setTruncateInvalidTx(truncateInvalidTx); - dest.setTruncateInvalidTxTime(in.readLong()); - } - - private <T> Set<T> emptySet(Set<T> set) { - if (set == null) { - return Sets.newHashSet(); - } - set.clear(); - return set; - } - - @Override - public void encode(TransactionEdit src, DataOutput out) throws IOException { - super.encode(src, out); - // null transaction type is represented as -1 - if (src.getType() == null) { - out.writeInt(-1); - } else { - out.writeInt(src.getType().ordinal()); - } - - Set<Long> truncateInvalidTx = src.getTruncateInvalidTx(); - if (truncateInvalidTx == null) { - out.writeInt(0); - } else { - out.writeInt(truncateInvalidTx.size()); - for (long id : truncateInvalidTx) { - out.writeLong(id); - } - } - out.writeLong(src.getTruncateInvalidTxTime()); - } - - @Override - public byte getVersion() { - return -3; - } - } - - static class TransactionEditCodecV4 extends TransactionEditCodecV3 { - @Override - public void decode(TransactionEdit dest, DataInput in) throws IOException { - super.decode(dest, in); - dest.setParentWritePointer(in.readLong()); - int checkpointPointersLen = in.readInt(); - if (checkpointPointersLen >= 0) { - long[] checkpointPointers = new long[checkpointPointersLen]; - for (int i = 0; i < checkpointPointersLen; i++) { - checkpointPointers[i] = in.readLong(); - } - dest.setCheckpointPointers(checkpointPointers); - } - } - - @Override - public void encode(TransactionEdit src, DataOutput out) throws IOException { - super.encode(src, out); - out.writeLong(src.getParentWritePointer()); - long[] checkpointPointers = src.getCheckpointPointers(); - if (checkpointPointers == null) { - out.writeInt(-1); - } else { - out.writeInt(checkpointPointers.length); - for (int i = 0; i < checkpointPointers.length; i++) { - out.writeLong(checkpointPointers[i]); - } - } - } - - @Override - public byte getVersion() { - return -4; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java deleted file mode 100644 index f76a8f1..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import java.io.IOException; -import java.util.List; - -/** - * Represents a log of transaction state changes. - */ -public interface TransactionLog { - - String getName(); - - long getTimestamp(); - - void append(TransactionEdit edit) throws IOException; - - void append(List<TransactionEdit> edits) throws IOException; - - void close() throws IOException; - - TransactionLogReader getReader() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java deleted file mode 100644 index 3a3eaca..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Represents a reader for {@link TransactionLog} instances. - */ -public interface TransactionLogReader extends Closeable { - /** - * Returns the next {@code TransactionEdit} from the log file, based on the current position, or {@code null} - * if the end of the file has been reached. - */ - TransactionEdit next() throws IOException; - - /** - * Populates {@code reuse} with the next {@code TransactionEdit}, based on the reader's current position in the - * log file. - * @param reuse The {@code TransactionEdit} instance to populate with the log entry data. - * @return The {@code TransactionEdit} instance, or {@code null} if the end of the file has been reached. - * @throws IOException If an error is encountered reading the log data. - */ - TransactionEdit next(TransactionEdit reuse) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java deleted file mode 100644 index 67d9aaf..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Common interface for transaction log writers used by classes extending {@link AbstractTransactionLog}. - */ -public interface TransactionLogWriter extends Closeable { - /** - * Adds a new transaction entry to the log. Note that this does not guarantee that the entry has been flushed - * to persistent storage until {@link #sync()} has been called. - * - * @param entry The transaction edit to append. - * @throws IOException If an error occurs while writing the edit to storage. - */ - void append(AbstractTransactionLog.Entry entry) throws IOException; - - /** - * Makes an entry of number of transaction entries that will follow in that log in a single sync. - * - * @param count Number of transaction entries. - * @throws IOException If an error occurs while writing the count to storage. - */ - void commitMarker(int count) throws IOException; - - /** - * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)}, - * but not yet flushed to durable storage. - * - * @throws IOException If an error occurs while flushing the outstanding edits. - */ - void sync() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java deleted file mode 100644 index 22bc449..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionManager; -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; - -/** - * Represents an in-memory snapshot of the full transaction state. - */ -public class TransactionSnapshot implements TransactionVisibilityState { - private long timestamp; - private long readPointer; - private long writePointer; - private Collection<Long> invalid; - private NavigableMap<Long, TransactionManager.InProgressTx> inProgress; - private Map<Long, Set<ChangeId>> committingChangeSets; - private Map<Long, Set<ChangeId>> committedChangeSets; - - public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid, - NavigableMap<Long, TransactionManager.InProgressTx> inProgress, - Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) { - this(timestamp, readPointer, writePointer, invalid, inProgress); - this.committingChangeSets = committing; - this.committedChangeSets = committed; - } - - public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid, - NavigableMap<Long, TransactionManager.InProgressTx> inProgress) { - this.timestamp = timestamp; - this.readPointer = readPointer; - this.writePointer = writePointer; - this.invalid = invalid; - this.inProgress = inProgress; - this.committingChangeSets = Collections.emptyMap(); - this.committedChangeSets = Collections.emptyMap(); - } - - @Override - public long getTimestamp() { - return timestamp; - } - - @Override - public long getReadPointer() { - return readPointer; - } - - @Override - public long getWritePointer() { - return writePointer; - } - - @Override - public Collection<Long> getInvalid() { - return invalid; - } - - @Override - public NavigableMap<Long, TransactionManager.InProgressTx> getInProgress() { - return inProgress; - } - - @Override - public long getVisibilityUpperBound() { - // the readPointer of the oldest in-progress tx is the oldest in use - // todo: potential problem with not moving visibility upper bound for the whole duration of long-running tx - Map.Entry<Long, TransactionManager.InProgressTx> firstInProgress = inProgress.firstEntry(); - if (firstInProgress == null) { - // using readPointer as smallest visible when non txs are there - return readPointer; - } - return firstInProgress.getValue().getVisibilityUpperBound(); - } - - /** - * Returns a map of transaction write pointer to sets of changed row keys for transactions that had called - * {@code InMemoryTransactionManager.canCommit(Transaction, Collection)} but not yet called - * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot. - * - * @return a map of transaction write pointer to set of changed row keys. - */ - public Map<Long, Set<ChangeId>> getCommittingChangeSets() { - return committingChangeSets; - } - - /** - * Returns a map of transaction write pointer to set of changed row keys for transaction that had successfully called - * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot. - * - * @return a map of transaction write pointer to set of changed row keys. - */ - public Map<Long, Set<ChangeId>> getCommittedChangeSets() { - return committedChangeSets; - } - - /** - * Checks that this instance matches another {@code TransactionSnapshot} instance. Note that the equality check - * ignores the snapshot timestamp value, but includes all other properties. - * - * @param obj the other instance to check for equality. - * @return {@code true} if the instances are equal, {@code false} if not. - */ - @Override - public boolean equals(Object obj) { - if (!(obj instanceof TransactionSnapshot)) { - return false; - } - TransactionSnapshot other = (TransactionSnapshot) obj; - return readPointer == other.readPointer && - writePointer == other.writePointer && - invalid.equals(other.invalid) && - inProgress.equals(other.inProgress) && - committingChangeSets.equals(other.committingChangeSets) && - committedChangeSets.equals(other.committedChangeSets); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("timestamp", timestamp) - .add("readPointer", readPointer) - .add("writePointer", writePointer) - .add("invalidSize", invalid.size()) - .add("inProgressSize", inProgress.size()) - .add("committingSize", committingChangeSets.size()) - .add("committedSize", committedChangeSets.size()) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hashCode(readPointer, writePointer, invalid, inProgress, committingChangeSets, committedChangeSets); - } - - /** - * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections. - * @param readPointer current transaction read pointer - * @param writePointer current transaction write pointer - * @param invalid current list of invalid write pointers - * @param inProgress current map of in-progress write pointers to expiration timestamps - * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not - * yet committed - * @param committed current map of write pointers to change sets which have committed - * @return a new {@code TransactionSnapshot} instance - */ - public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer, - long writePointer, Collection<Long> invalid, - NavigableMap<Long, TransactionManager.InProgressTx> inProgress, - Map<Long, Set<ChangeId>> committing, - NavigableMap<Long, Set<ChangeId>> committed) { - // copy invalid IDs - Collection<Long> invalidCopy = Lists.newArrayList(invalid); - // copy in-progress IDs and expirations - NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress); - - // for committing and committed maps, we need to copy each individual Set as well to prevent modification - Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap(); - for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) { - committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue())); - } - - NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>(); - for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) { - committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue())); - } - - return new TransactionSnapshot(snapshotTime, readPointer, writePointer, - invalidCopy, inProgressCopy, committingCopy, committedCopy); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java deleted file mode 100644 index 0acb9bb..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import com.google.common.util.concurrent.Service; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; - -/** - * Defines the common contract for persisting transaction state changes. - */ -public interface TransactionStateStorage extends Service { - - /** - * Persists a snapshot of transaction state to an output stream. - */ - public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException; - - /** - * Persists a snapshot of transaction state. - */ - public void writeSnapshot(TransactionSnapshot snapshot) throws IOException; - - /** - * Returns the most recent snapshot that has been successfully written. Note that this may return {@code null} - * if no completed snapshot files are found. - */ - public TransactionSnapshot getLatestSnapshot() throws IOException; - - /** - * Returns the most recent transaction visibility state that has been successfully written. - * Note that this may return {@code null} if no completed snapshot files are found. - * @return {@link TransactionVisibilityState} - */ - public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException; - - /** - * Removes any snapshots prior to the {@code numberToKeep} most recent. - * - * @param numberToKeep The number of most recent snapshots to keep. - * @throws IOException If an error occurs while deleting old snapshots. - * @return The timestamp of the oldest snapshot kept. - */ - public long deleteOldSnapshots(int numberToKeep) throws IOException; - - /** - * Returns the (non-qualified) names of available snapshots. - */ - public List<String> listSnapshots() throws IOException; - - /** - * Returns all {@link TransactionLog}s with a timestamp greater than or equal to the given timestamp. Note that - * the returned list is guaranteed to be sorted in ascending timestamp order. - */ - public List<TransactionLog> getLogsSince(long timestamp) throws IOException; - - /** - * Creates a new {@link TransactionLog}. - */ - public TransactionLog createLog(long timestamp) throws IOException; - - /** - * Returns the (non-qualified) names of available logs. - */ - public List<String> listLogs() throws IOException; - - /** - * Removes any transaction logs with a timestamp older than the given value. Logs must be removed based on timestamp - * to ensure we can fully recover state based on a given snapshot. - * @param timestamp The timestamp to delete up to. Logs with a timestamp less than this value will be removed. - * @throws IOException If an error occurs while removing logs. - */ - public void deleteLogsOlderThan(long timestamp) throws IOException; - - /** - * Create the directories required for the transaction state stage. - * @throws IOException If an error occurred during the creation of required directories for transaction state storage. - */ - public void setupStorage() throws IOException; - - /** - * Returns a string representation of the location used for persistence. - */ - public String getLocation(); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java deleted file mode 100644 index 68d17c3..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TransactionManager; - -import java.util.Collection; -import java.util.NavigableMap; - -/** - * Transaction Visibility state contains information required by TransactionProcessor CoProcessor - * to determine cell visibility. - */ -public interface TransactionVisibilityState { - - /** - * Returns the timestamp from when this snapshot was created. - */ - long getTimestamp(); - - /** - * Returns the read pointer at the time of the snapshot. - */ - long getReadPointer(); - - /** - * Returns the next write pointer at the time of the snapshot. - */ - long getWritePointer(); - - /** - * Returns the list of invalid write pointers at the time of the snapshot. - */ - Collection<Long> getInvalid(); - - /** - * Returns the map of write pointers to in-progress transactions at the time of the snapshot. - */ - NavigableMap<Long, TransactionManager.InProgressTx> getInProgress(); - - /** - * @return transaction id {@code X} such that any of the transactions newer than {@code X} might be invisible to - * some of the currently in-progress transactions or to those that will be started <p> - * NOTE: the returned tx id can be invalid. - */ - long getVisibilityUpperBound(); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java b/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java deleted file mode 100644 index c94f0fe..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains interfaces and implementations for persisting transaction state. - */ -package co.cask.tephra.persist; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java b/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java deleted file mode 100644 index 664bfe3..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.rpc; - -/** - * Defines lifecycle interface for all rpc handlers. - */ -public interface RPCServiceHandler { - - void init() throws Exception; - - void destroy() throws Exception; -}
