This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new c29af139a8f [FLINK-31743][statebackend/rocksdb] disable rocksdb log
relocating when instance path too long
c29af139a8f is described below
commit c29af139a8f3055c96c641016a31cd3a92ca022a
Author: wangfeifan <[email protected]>
AuthorDate: Fri Apr 21 23:43:31 2023 +0800
[FLINK-31743][statebackend/rocksdb] disable rocksdb log relocating when
instance path too long
---
.../state/EmbeddedRocksDBStateBackend.java | 10 +++--
.../state/RocksDBKeyedStateBackendBuilder.java | 6 ++-
.../streaming/state/RocksDBResourceContainer.java | 45 ++++++++++++++++++++--
.../streaming/state/RocksDBStateBackend.java | 2 +-
.../state/EmbeddedRocksDBStateBackendTest.java | 5 ++-
.../state/RocksDBStateBackendConfigTest.java | 19 ++++++++-
6 files changed, 74 insertions(+), 13 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index e65d925f453..94ea783f670 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -473,7 +473,9 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
}
final RocksDBResourceContainer resourceContainer =
createOptionsAndResourceContainer(
- sharedResources,
nativeMetricOptions.isStatisticsEnabled());
+ sharedResources,
+ instanceBasePath,
+ nativeMetricOptions.isStatisticsEnabled());
ExecutionConfig executionConfig = env.getExecutionConfig();
StreamCompressionDecorator keyGroupCompressionDecorator =
@@ -875,13 +877,14 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
}
@VisibleForTesting
- RocksDBResourceContainer createOptionsAndResourceContainer() {
- return createOptionsAndResourceContainer(null, false);
+ RocksDBResourceContainer createOptionsAndResourceContainer(@Nullable File
instanceBasePath) {
+ return createOptionsAndResourceContainer(null, instanceBasePath,
false);
}
@VisibleForTesting
private RocksDBResourceContainer createOptionsAndResourceContainer(
@Nullable OpaqueMemoryResource<RocksDBSharedResources>
sharedResources,
+ @Nullable File instanceBasePath,
boolean enableStatistics) {
return new RocksDBResourceContainer(
@@ -889,6 +892,7 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
predefinedOptions != null ? predefinedOptions :
PredefinedOptions.DEFAULT,
rocksDbOptionsFactory,
sharedResources,
+ instanceBasePath,
enableStatistics);
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 7007f1b5463..3df43d7224e 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -166,7 +166,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
this.columnFamilyOptionsFactory =
Preconditions.checkNotNull(columnFamilyOptionsFactory);
this.optionsContainer = optionsContainer;
this.instanceBasePath = instanceBasePath;
- this.instanceRocksDBPath = new File(instanceBasePath,
DB_INSTANCE_DIR_STRING);
+ this.instanceRocksDBPath = getInstanceRocksDBPath(instanceBasePath);
this.metricGroup = metricGroup;
this.enableIncrementalCheckpointing = false;
this.nativeMetricOptions = new RocksDBNativeMetricOptions();
@@ -264,6 +264,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
return this;
}
+ public static File getInstanceRocksDBPath(File instanceBasePath) {
+ return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
+ }
+
private static void checkAndCreateDirectory(File directory) throws
IOException {
if (directory.exists()) {
if (!directory.isDirectory()) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index ed652980d04..5bf2f6e63f5 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -59,6 +59,11 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
public final class RocksDBResourceContainer implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBResourceContainer.class);
+ // the filename length limit is 255 on most operating systems
+ private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 -
"_LOG".length();
+
+ @Nullable private final File instanceRocksDBPath;
+
/** The configurations from file. */
private final ReadableConfig configuration;
@@ -82,13 +87,13 @@ public final class RocksDBResourceContainer implements
AutoCloseable {
@VisibleForTesting
public RocksDBResourceContainer() {
- this(new Configuration(), PredefinedOptions.DEFAULT, null, null,
false);
+ this(new Configuration(), PredefinedOptions.DEFAULT, null, null, null,
false);
}
@VisibleForTesting
public RocksDBResourceContainer(
PredefinedOptions predefinedOptions, @Nullable
RocksDBOptionsFactory optionsFactory) {
- this(new Configuration(), predefinedOptions, optionsFactory, null,
false);
+ this(new Configuration(), predefinedOptions, optionsFactory, null,
null, false);
}
@VisibleForTesting
@@ -96,7 +101,7 @@ public final class RocksDBResourceContainer implements
AutoCloseable {
PredefinedOptions predefinedOptions,
@Nullable RocksDBOptionsFactory optionsFactory,
@Nullable OpaqueMemoryResource<RocksDBSharedResources>
sharedResources) {
- this(new Configuration(), predefinedOptions, optionsFactory,
sharedResources, false);
+ this(new Configuration(), predefinedOptions, optionsFactory,
sharedResources, null, false);
}
public RocksDBResourceContainer(
@@ -105,11 +110,33 @@ public final class RocksDBResourceContainer implements
AutoCloseable {
@Nullable RocksDBOptionsFactory optionsFactory,
@Nullable OpaqueMemoryResource<RocksDBSharedResources>
sharedResources,
boolean enableStatistics) {
+ this(
+ configuration,
+ predefinedOptions,
+ optionsFactory,
+ sharedResources,
+ null,
+ enableStatistics);
+ }
+
+ RocksDBResourceContainer(
+ ReadableConfig configuration,
+ PredefinedOptions predefinedOptions,
+ @Nullable RocksDBOptionsFactory optionsFactory,
+ @Nullable OpaqueMemoryResource<RocksDBSharedResources>
sharedResources,
+ @Nullable File instanceBasePath,
+ boolean enableStatistics) {
this.configuration = configuration;
this.predefinedOptions = checkNotNull(predefinedOptions);
this.optionsFactory = optionsFactory;
this.sharedResources = sharedResources;
+
+ this.instanceRocksDBPath =
+ instanceBasePath != null
+ ?
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath)
+ : null;
+
this.enableStatistics = enableStatistics;
this.handlesToClose = new ArrayList<>();
}
@@ -314,7 +341,17 @@ public final class RocksDBResourceContainer implements
AutoCloseable {
String logDir = internalGetOption(RocksDBConfigurableOptions.LOG_DIR);
if (logDir == null || logDir.isEmpty()) {
- relocateDefaultDbLogDir(currentOptions);
+ if (instanceRocksDBPath == null
+ || instanceRocksDBPath.getAbsolutePath().length()
+ <= INSTANCE_PATH_LENGTH_LIMIT) {
+ relocateDefaultDbLogDir(currentOptions);
+ } else {
+ // disable log relocate when instance path length exceeds
limit to prevent rocksdb
+ // log file creation failure, details in FLINK-31743
+ LOG.warn(
+ "RocksDB instance path length exceeds limit : {},
disable log relocate.",
+ instanceRocksDBPath);
+ }
} else {
currentOptions.setDbLogDir(logDir);
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 20be1e085f0..1275a5aa928 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -575,7 +575,7 @@ public class RocksDBStateBackend extends
AbstractManagedMemoryStateBackend
@VisibleForTesting
RocksDBResourceContainer createOptionsAndResourceContainer() {
- return rocksDBStateBackend.createOptionsAndResourceContainer();
+ return rocksDBStateBackend.createOptionsAndResourceContainer(null);
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index 46788406c16..4d6b533d662 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -86,7 +86,6 @@ import java.util.Queue;
import java.util.concurrent.RunnableFuture;
import static junit.framework.TestCase.assertNotNull;
-import static
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.DB_INSTANCE_DIR_STRING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -148,7 +147,9 @@ public class EmbeddedRocksDBStateBackendTest
private final RocksDBResourceContainer optionsContainer = new
RocksDBResourceContainer();
public void prepareRocksDB() throws Exception {
- String dbPath = new File(TEMP_FOLDER.newFolder(),
DB_INSTANCE_DIR_STRING).getAbsolutePath();
+ String dbPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(TEMP_FOLDER.newFolder())
+ .getAbsolutePath();
ColumnFamilyOptions columnOptions =
optionsContainer.getColumnOptions();
ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index e16d1504812..36e0d3c2bce 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -101,7 +101,7 @@ public class RocksDBStateBackendConfigTest {
final File logFile = File.createTempFile(getClass().getSimpleName() +
"-", ".log");
// set the environment variable 'log.file' with the Flink log file
location
System.setProperty("log.file", logFile.getPath());
- try (RocksDBResourceContainer container =
backend.createOptionsAndResourceContainer()) {
+ try (RocksDBResourceContainer container =
backend.createOptionsAndResourceContainer(null)) {
assertEquals(
RocksDBConfigurableOptions.LOG_LEVEL.defaultValue(),
container.getDbOptions().infoLogLevel());
@@ -109,6 +109,19 @@ public class RocksDBStateBackendConfigTest {
} finally {
logFile.delete();
}
+
+ StringBuilder longInstanceBasePath =
+ new StringBuilder(tempFolder.newFolder().getAbsolutePath());
+ while (longInstanceBasePath.length() < 255) {
+ longInstanceBasePath.append("/append-for-long-path");
+ }
+ try (RocksDBResourceContainer container =
+ backend.createOptionsAndResourceContainer(
+ new File(longInstanceBasePath.toString()))) {
+ assertTrue(container.getDbOptions().dbLogDir().isEmpty());
+ } finally {
+ logFile.delete();
+ }
}
// ------------------------------------------------------------------------
@@ -531,7 +544,7 @@ public class RocksDBStateBackendConfigTest {
try (RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(
- configuration, PredefinedOptions.DEFAULT, null,
null, false)) {
+ configuration, PredefinedOptions.DEFAULT, null,
null, null, false)) {
DBOptions dbOptions = optionsContainer.getDbOptions();
assertEquals(-1, dbOptions.maxOpenFiles());
@@ -614,6 +627,7 @@ public class RocksDBStateBackendConfigTest {
PredefinedOptions.SPINNING_DISK_OPTIMIZED,
null,
null,
+ null,
false)) {
final ColumnFamilyOptions columnFamilyOptions =
optionsContainer.getColumnOptions();
@@ -627,6 +641,7 @@ public class RocksDBStateBackendConfigTest {
PredefinedOptions.SPINNING_DISK_OPTIMIZED,
null,
null,
+ null,
false)) {
final ColumnFamilyOptions columnFamilyOptions =
optionsContainer.getColumnOptions();