[FLINK-3357] [core] Drop AbstractID#toShortString()

This closes #1601


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28feede7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28feede7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28feede7

Branch: refs/heads/master
Commit: 28feede7d40dc73ec861cf93393650b8b10afc3a
Parents: 5c47f38
Author: Ufuk Celebi <u...@apache.org>
Authored: Mon Feb 8 16:05:27 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 8 20:18:20 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  4 +--
 .../contrib/streaming/state/DbStateBackend.java | 35 ++++++++++----------
 .../streaming/state/DbStateBackendTest.java     | 10 +++---
 .../java/org/apache/flink/util/AbstractID.java  | 15 ---------
 ...taskExecutionAttemptAccumulatorsHandler.java |  2 +-
 .../InputGateDeploymentDescriptor.java          |  2 +-
 .../io/network/partition/ResultPartitionID.java |  2 +-
 7 files changed, 28 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index aaaeea4..eefa4a9 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -81,11 +81,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        private File getDbPath(String stateName) {
-               return new File(new File(new File(new File(dbBasePath), 
jobId.toShortString()), operatorIdentifier), stateName);
+               return new File(new File(new File(new File(dbBasePath), 
jobId.toString()), operatorIdentifier), stateName);
        }
 
        private String getCheckpointPath(String stateName) {
-               return checkpointDirectory + "/" + jobId.toShortString() + "/" 
+ operatorIdentifier + "/" + stateName;
+               return checkpointDirectory + "/" + jobId.toString() + "/" + 
operatorIdentifier + "/" + stateName;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index 1d1ccd7..5162983 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -17,14 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.Callable;
-
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -42,6 +34,14 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
 import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
 
 /**
@@ -76,6 +76,8 @@ public class DbStateBackend extends AbstractStateBackend {
 
        private transient Environment env;
 
+       private transient String appId;
+
        // ------------------------------------------------------
 
        private final DbBackendConfig dbConfig;
@@ -159,19 +161,14 @@ public class DbStateBackend extends AbstractStateBackend {
                                        // store the checkpoint id and 
timestamp for bookkeeping
                                        long handleId = rnd.nextLong();
 
-                                       // We use the ApplicationID here, 
because it is restored when
-                                       // the job is started from a savepoint 
(whereas the job ID
-                                       // changes with each submission).
-                                       String appIdShort = 
env.getApplicationID().toShortString();
-
                                        byte[] serializedState = 
InstantiationUtil.serializeObject(state);
-                                       
dbAdapter.setCheckpointInsertParams(appIdShort, insertStatement,
+                                       
dbAdapter.setCheckpointInsertParams(appId, insertStatement,
                                                        checkpointID, 
timestamp, handleId,
                                                        serializedState);
 
                                        insertStatement.executeUpdate();
 
-                                       return new DbStateHandle<>(appIdShort, 
checkpointID, timestamp, handleId,
+                                       return new DbStateHandle<>(appId, 
checkpointID, timestamp, handleId,
                                                        dbConfig, 
serializedState.length);
                                }
                        }, numSqlRetries, sqlRetrySleep);
@@ -253,6 +250,7 @@ public class DbStateBackend extends AbstractStateBackend {
 
                this.rnd = new Random();
                this.env = env;
+               this.appId = env.getApplicationID().toString().substring(0, 16);
 
                connections = dbConfig.createShardedConnection();
 
@@ -270,8 +268,8 @@ public class DbStateBackend extends AbstractStateBackend {
                if (nonPartitionedStateBackend == null) {
                        insertStatement = retry(new 
Callable<PreparedStatement>() {
                                public PreparedStatement call() throws 
SQLException {
-                                       
dbAdapter.createCheckpointsTable(env.getApplicationID().toShortString(), 
getConnections().getFirst());
-                                       return 
dbAdapter.prepareCheckpointInsert(env.getApplicationID().toShortString(),
+                                       dbAdapter.createCheckpointsTable(appId, 
getConnections().getFirst());
+                                       return 
dbAdapter.prepareCheckpointInsert(appId,
                                                        
getConnections().getFirst());
                                }
                        }, numSqlRetries, sqlRetrySleep);
@@ -300,9 +298,10 @@ public class DbStateBackend extends AbstractStateBackend {
        @Override
        public void disposeAllStateForCurrentJob() throws Exception {
                if (nonPartitionedStateBackend == null) {
-                       
dbAdapter.disposeAllStateForJob(env.getApplicationID().toShortString(), 
connections.getFirst());
+                       dbAdapter.disposeAllStateForJob(appId, 
connections.getFirst());
                } else {
                        
nonPartitionedStateBackend.disposeAllStateForCurrentJob();
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index d4883dd..91375e4 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -136,14 +136,15 @@ public class DbStateBackendTest {
 
                Environment env = new DummyEnvironment("test", 1, 0);
                backend.initializeForJob(env, "dummy-setup-ser", 
StringSerializer.INSTANCE);
+               String appId = env.getApplicationID().toString().substring(0, 
16);
 
                assertNotNull(backend.getConnections());
                assertTrue(
-                               
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + 
env.getApplicationID().toShortString()));
+                               
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
 
                backend.disposeAllStateForCurrentJob();
                assertFalse(
-                               
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + 
env.getApplicationID().toShortString()));
+                               
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
                backend.close();
 
                assertTrue(backend.getConnections().getFirst().isClosed());
@@ -153,6 +154,7 @@ public class DbStateBackendTest {
        public void testSerializableState() throws Exception {
                Environment env = new DummyEnvironment("test", 1, 0);
                DbStateBackend backend = new DbStateBackend(conf);
+               String appId = env.getApplicationID().toString().substring(0, 
16);
 
                backend.initializeForJob(env, "dummy-ser-state", 
StringSerializer.INSTANCE);
 
@@ -173,12 +175,12 @@ public class DbStateBackendTest {
                assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
                handle2.discardState();
 
-               assertFalse(isTableEmpty(backend.getConnections().getFirst(), 
"checkpoints_" + env.getApplicationID().toShortString()));
+               assertFalse(isTableEmpty(backend.getConnections().getFirst(), 
"checkpoints_" + appId));
 
                assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
                handle3.discardState();
 
-               assertTrue(isTableEmpty(backend.getConnections().getFirst(), 
"checkpoints_" + env.getApplicationID().toShortString()));
+               assertTrue(isTableEmpty(backend.getConnections().getFirst(), 
"checkpoints_" + appId));
 
                backend.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
index 5a57900..27276af 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
@@ -51,9 +51,6 @@ public class AbstractID implements IOReadableWritable, 
Comparable<AbstractID>, j
        /** The memoized value returned by toString() */
        private String toString;
 
-       /** The memoized value returned by toShortString() */
-       private String toShortString;
-
        // 
--------------------------------------------------------------------------------------------
        
        /**
@@ -145,7 +142,6 @@ public class AbstractID implements IOReadableWritable, 
Comparable<AbstractID>, j
                this.upperPart = in.readLong();
 
                this.toString = null;
-               this.toShortString = null;
        }
 
        @Override
@@ -188,17 +184,6 @@ public class AbstractID implements IOReadableWritable, 
Comparable<AbstractID>, j
 
                return this.toString;
        }
-
-       public String toShortString() {
-               if (this.toShortString == null) {
-                       final byte[] ba = new byte[SIZE_OF_LONG];
-                       longToByteArray(upperPart, ba, 0);
-
-                       this.toShortString = StringUtils.byteToHexString(ba);
-               }
-
-               return this.toShortString;
-       }
        
        @Override
        public int compareTo(AbstractID o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 0111e8c..14ccc0c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -48,7 +48,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
                gen.writeNumberField("subtask", 
execAttempt.getVertex().getParallelSubtaskIndex());
                gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
-               gen.writeStringField("id", 
execAttempt.getAttemptId().toShortString());
+               gen.writeStringField("id", 
execAttempt.getAttemptId().toString());
                
                gen.writeArrayFieldStart("user-accumulators");
                for (StringifiedAccumulatorResult acc : accs) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 77b072a..8a753c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -83,7 +83,7 @@ public class InputGateDeploymentDescriptor implements 
Serializable {
        public String toString() {
                return String.format("InputGateDeploymentDescriptor [result id: 
%s, " +
                                                "consumed subpartition index: 
%d, input channels: %s]",
-                               consumedResultId.toShortString(), 
consumedSubpartitionIndex,
+                               consumedResultId.toString(), 
consumedSubpartitionIndex,
                                Arrays.toString(inputChannels));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index af2970d..a18abde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -72,6 +72,6 @@ public final class ResultPartitionID implements Serializable {
 
        @Override
        public String toString() {
-               return partitionId.toShortString() + "@" + 
producerId.toShortString();
+               return partitionId.toString() + "@" + producerId.toString();
        }
 }

Reply via email to