Zakelly commented on code in PR #24682: URL: https://github.com/apache/flink/pull/24682#discussion_r1574434930
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.Preconditions; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** Utils for ForSt Operations. */ +public class ForStOperationUtils { + + /** + * The name of the merge operator in ForSt. Do not change except you know exactly what you do. + */ + public static final String MERGE_OPERATOR_NAME = "stringappendtest"; Review Comment: Is this still needed? ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.Cache; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.Filter; +import org.rocksdb.FlinkEnv; +import org.rocksdb.IndexType; +import org.rocksdb.PlainTableConfig; +import org.rocksdb.ReadOptions; +import org.rocksdb.Statistics; +import org.rocksdb.TableFormatConfig; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; + +/** + * The container for ForSt resources, including option factory and shared resource among instances. + * + * <p>This should be the only entrance for ForStStateBackend to get ForSt options, and should be + * properly (and necessarily) closed to prevent resource leak. + */ +public final class ForStResourceContainer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class); + + private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG"; + + // the filename length limit is 255 on most operating systems + private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); + + private static final String DB_DIR_STRING = "db"; + + @Nullable private final URI remoteBasePath; + + @Nullable private final URI remoteForStPath; + + @Nullable private final File localBasePath; + + @Nullable private final File localForStPath; + + /** The configurations from file. */ + private final ReadableConfig configuration; + + /** The options factory to create the ForSt options. */ + @Nullable private final ForStOptionsFactory optionsFactory; + + /** + * The shared resource among ForSt instances. This resource is not part of the 'handlesToClose', + * because the handles to close are closed quietly, whereas for this one, we want exceptions to + * be reported. + */ + @Nullable private final OpaqueMemoryResource<ForStSharedResources> sharedResources; + + private final boolean enableStatistics; + + /** The handles to be closed when the container is closed. */ + private final ArrayList<AutoCloseable> handlesToClose; + + @Nullable private Path relocatedDbLogBaseDir; + + @VisibleForTesting + public ForStResourceContainer() { + this(new Configuration(), null, null, null, null, false); + } + + @VisibleForTesting + public ForStResourceContainer(@Nullable ForStOptionsFactory optionsFactory) { + this(new Configuration(), optionsFactory, null, null, null, false); + } + + @VisibleForTesting + public ForStResourceContainer( + @Nullable ForStOptionsFactory optionsFactory, + @Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources) { + this(new Configuration(), optionsFactory, sharedResources, null, null, false); + } + + public ForStResourceContainer( + ReadableConfig configuration, + @Nullable ForStOptionsFactory optionsFactory, + @Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources, + @Nullable File localBasePath, + @Nullable URI remoteBasePath, + boolean enableStatistics) { + + this.configuration = configuration; + this.optionsFactory = optionsFactory; + this.sharedResources = sharedResources; + + this.localBasePath = localBasePath; + this.localForStPath = localBasePath != null ? new File(localBasePath, DB_DIR_STRING) : null; + this.remoteBasePath = remoteBasePath; + this.remoteForStPath = + remoteBasePath != null ? remoteBasePath.resolve(DB_DIR_STRING) : null; + + this.enableStatistics = enableStatistics; + this.handlesToClose = new ArrayList<>(); + } + + /** Gets the ForSt {@link DBOptions} to be used for ForSt instances. */ + public DBOptions getDbOptions() { + // initial options from common profile + DBOptions opt = createBaseCommonDBOptions(); + handlesToClose.add(opt); + + // load configurable options on top of pre-defined profile + setDBOptionsFromConfigurableOptions(opt); + + // add user-defined options factory, if specified + if (optionsFactory != null) { + opt = optionsFactory.createDBOptions(opt, handlesToClose); + } + + // add necessary default options + opt = opt.setCreateIfMissing(true).setAvoidFlushDuringShutdown(true); + + // if sharedResources is non-null, use the write buffer manager from it. + if (sharedResources != null) { + opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager()); + } + + if (enableStatistics) { + Statistics statistics = new Statistics(); + opt.setStatistics(statistics); + handlesToClose.add(statistics); + } + + if (remoteForStPath != null) { Review Comment: What if the `remoteForStPath` is null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
