[FLINK-3357] [core] Drop AbstractID#toShortString() This closes #1601
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28feede7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28feede7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28feede7 Branch: refs/heads/master Commit: 28feede7d40dc73ec861cf93393650b8b10afc3a Parents: 5c47f38 Author: Ufuk Celebi <u...@apache.org> Authored: Mon Feb 8 16:05:27 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 8 20:18:20 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 4 +-- .../contrib/streaming/state/DbStateBackend.java | 35 ++++++++++---------- .../streaming/state/DbStateBackendTest.java | 10 +++--- .../java/org/apache/flink/util/AbstractID.java | 15 --------- ...taskExecutionAttemptAccumulatorsHandler.java | 2 +- .../InputGateDeploymentDescriptor.java | 2 +- .../io/network/partition/ResultPartitionID.java | 2 +- 7 files changed, 28 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index aaaeea4..eefa4a9 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -81,11 +81,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { } private File getDbPath(String stateName) { - return new File(new File(new File(new File(dbBasePath), jobId.toShortString()), operatorIdentifier), stateName); + return new File(new File(new File(new File(dbBasePath), jobId.toString()), operatorIdentifier), stateName); } private String getCheckpointPath(String stateName) { - return checkpointDirectory + "/" + jobId.toShortString() + "/" + operatorIdentifier + "/" + stateName; + return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index 1d1ccd7..5162983 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -17,14 +17,6 @@ package org.apache.flink.contrib.streaming.state; -import java.io.Serializable; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.Callable; - import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -42,6 +34,14 @@ import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.Callable; + import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; /** @@ -76,6 +76,8 @@ public class DbStateBackend extends AbstractStateBackend { private transient Environment env; + private transient String appId; + // ------------------------------------------------------ private final DbBackendConfig dbConfig; @@ -159,19 +161,14 @@ public class DbStateBackend extends AbstractStateBackend { // store the checkpoint id and timestamp for bookkeeping long handleId = rnd.nextLong(); - // We use the ApplicationID here, because it is restored when - // the job is started from a savepoint (whereas the job ID - // changes with each submission). - String appIdShort = env.getApplicationID().toShortString(); - byte[] serializedState = InstantiationUtil.serializeObject(state); - dbAdapter.setCheckpointInsertParams(appIdShort, insertStatement, + dbAdapter.setCheckpointInsertParams(appId, insertStatement, checkpointID, timestamp, handleId, serializedState); insertStatement.executeUpdate(); - return new DbStateHandle<>(appIdShort, checkpointID, timestamp, handleId, + return new DbStateHandle<>(appId, checkpointID, timestamp, handleId, dbConfig, serializedState.length); } }, numSqlRetries, sqlRetrySleep); @@ -253,6 +250,7 @@ public class DbStateBackend extends AbstractStateBackend { this.rnd = new Random(); this.env = env; + this.appId = env.getApplicationID().toString().substring(0, 16); connections = dbConfig.createShardedConnection(); @@ -270,8 +268,8 @@ public class DbStateBackend extends AbstractStateBackend { if (nonPartitionedStateBackend == null) { insertStatement = retry(new Callable<PreparedStatement>() { public PreparedStatement call() throws SQLException { - dbAdapter.createCheckpointsTable(env.getApplicationID().toShortString(), getConnections().getFirst()); - return dbAdapter.prepareCheckpointInsert(env.getApplicationID().toShortString(), + dbAdapter.createCheckpointsTable(appId, getConnections().getFirst()); + return dbAdapter.prepareCheckpointInsert(appId, getConnections().getFirst()); } }, numSqlRetries, sqlRetrySleep); @@ -300,9 +298,10 @@ public class DbStateBackend extends AbstractStateBackend { @Override public void disposeAllStateForCurrentJob() throws Exception { if (nonPartitionedStateBackend == null) { - dbAdapter.disposeAllStateForJob(env.getApplicationID().toShortString(), connections.getFirst()); + dbAdapter.disposeAllStateForJob(appId, connections.getFirst()); } else { nonPartitionedStateBackend.disposeAllStateForCurrentJob(); } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java index d4883dd..91375e4 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java @@ -136,14 +136,15 @@ public class DbStateBackendTest { Environment env = new DummyEnvironment("test", 1, 0); backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE); + String appId = env.getApplicationID().toString().substring(0, 16); assertNotNull(backend.getConnections()); assertTrue( - isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString())); + isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId)); backend.disposeAllStateForCurrentJob(); assertFalse( - isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString())); + isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId)); backend.close(); assertTrue(backend.getConnections().getFirst().isClosed()); @@ -153,6 +154,7 @@ public class DbStateBackendTest { public void testSerializableState() throws Exception { Environment env = new DummyEnvironment("test", 1, 0); DbStateBackend backend = new DbStateBackend(conf); + String appId = env.getApplicationID().toString().substring(0, 16); backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE); @@ -173,12 +175,12 @@ public class DbStateBackendTest { assertEquals(state2, handle2.getState(getClass().getClassLoader())); handle2.discardState(); - assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString())); + assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId)); assertEquals(state3, handle3.getState(getClass().getClassLoader())); handle3.discardState(); - assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString())); + assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId)); backend.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-core/src/main/java/org/apache/flink/util/AbstractID.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java index 5a57900..27276af 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java @@ -51,9 +51,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j /** The memoized value returned by toString() */ private String toString; - /** The memoized value returned by toShortString() */ - private String toShortString; - // -------------------------------------------------------------------------------------------- /** @@ -145,7 +142,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j this.upperPart = in.readLong(); this.toString = null; - this.toShortString = null; } @Override @@ -188,17 +184,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j return this.toString; } - - public String toShortString() { - if (this.toShortString == null) { - final byte[] ba = new byte[SIZE_OF_LONG]; - longToByteArray(upperPart, ba, 0); - - this.toShortString = StringUtils.byteToHexString(ba); - } - - return this.toShortString; - } @Override public int compareTo(AbstractID o) { http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index 0111e8c..14ccc0c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -48,7 +48,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex()); gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); - gen.writeStringField("id", execAttempt.getAttemptId().toShortString()); + gen.writeStringField("id", execAttempt.getAttemptId().toString()); gen.writeArrayFieldStart("user-accumulators"); for (StringifiedAccumulatorResult acc : accs) { http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java index 77b072a..8a753c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java @@ -83,7 +83,7 @@ public class InputGateDeploymentDescriptor implements Serializable { public String toString() { return String.format("InputGateDeploymentDescriptor [result id: %s, " + "consumed subpartition index: %d, input channels: %s]", - consumedResultId.toShortString(), consumedSubpartitionIndex, + consumedResultId.toString(), consumedSubpartitionIndex, Arrays.toString(inputChannels)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java index af2970d..a18abde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java @@ -72,6 +72,6 @@ public final class ResultPartitionID implements Serializable { @Override public String toString() { - return partitionId.toShortString() + "@" + producerId.toShortString(); + return partitionId.toString() + "@" + producerId.toString(); } }