[hotfix] Remove unnecessary job id from RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe5b92f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe5b92f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe5b92f7 Branch: refs/heads/release-1.3 Commit: fe5b92f7e17a90315245706d5bc6868f9f729e56 Parents: 6c57354 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu May 18 17:48:30 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu May 18 23:16:27 2017 +0200 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 2 -- .../flink/contrib/streaming/state/RocksDBStateBackend.java | 3 +-- .../test/query/KVStateRequestSerializerRocksDBTest.java | 8 +++----- 3 files changed, 4 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d0f73bf..88a759d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -18,7 +18,6 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -180,7 +179,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final String SST_FILE_SUFFIX = ".sst"; public RocksDBKeyedStateBackend( - JobID jobId, String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 55b8be2..2b70dcd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -300,10 +300,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { lazyInitializeForJob(env, operatorIdentifier); File instanceBasePath = - new File(getNextStoragePath(), "job-" + jobId.toString() + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); + new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); return new RocksDBKeyedStateBackend<>( - jobID, operatorIdentifier, env.getUserClassLoader(), instanceBasePath, http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java index 05f72c2..0f99afb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java @@ -19,7 +19,6 @@ package org.apache.flink.test.query; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -66,7 +65,6 @@ public final class KVStateRequestSerializerRocksDBTest { final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> { RocksDBKeyedStateBackend2( - final JobID jobId, final String operatorIdentifier, final ClassLoader userCodeClassLoader, final File instanceBasePath, @@ -78,7 +76,7 @@ public final class KVStateRequestSerializerRocksDBTest { final KeyGroupRange keyGroupRange, final ExecutionConfig executionConfig) throws Exception { - super(jobId, operatorIdentifier, userCodeClassLoader, + super(operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, executionConfig, false); @@ -110,7 +108,7 @@ public final class KVStateRequestSerializerRocksDBTest { ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend = new RocksDBKeyedStateBackend2<>( - new JobID(), "no-op", + "no-op", ClassLoader.getSystemClassLoader(), temporaryFolder.getRoot(), dbOptions, @@ -147,7 +145,7 @@ public final class KVStateRequestSerializerRocksDBTest { ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = new RocksDBKeyedStateBackend<>( - new JobID(), "no-op", + "no-op", ClassLoader.getSystemClassLoader(), temporaryFolder.getRoot(), dbOptions,