[
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930161#comment-15930161
]
ASF GitHub Bot commented on FLINK-5544:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3359#discussion_r106674160
--- Diff:
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService<K, N> extends
InternalTimerService<K, N> {
+
+ private static Logger LOG =
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+
+ /** The data base where stores all timers */
+ private final RocksDB db;
+
+ /** The path where the rocksdb locates */
+ private final Path dbPath;
+
+ /**
+ * The in-memory heaps backed by rocksdb to retrieve the next timer to
trigger. Each
+ * partition's leader is stored in the heap. When the timers in a
partition is changed, we
+ * will change the partition's leader and update the heap accordingly.
+ */
+ private final int numPartitions;
+ private final PersistentTimerHeap eventTimeHeap;
+ private final PersistentTimerHeap processingTimeHeap;
+
+ private static int MAX_PARTITIONS = (1 << 16);
+
+ public RocksDBInternalTimerService(
+ int totalKeyGroups,
+ KeyGroupRange keyGroupRange,
+ KeyContext keyContext,
+ ProcessingTimeService processingTimeService,
+ Path dbPath) {
+
+ super(totalKeyGroups, keyGroupRange, keyContext,
processingTimeService);
+
+ this.dbPath = dbPath;
+
+ try {
+ FileSystem fileSystem = this.dbPath.getFileSystem();
+ if (fileSystem.exists(this.dbPath)) {
+ fileSystem.delete(this.dbPath, true);
+ }
+
+ fileSystem.mkdirs(dbPath);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while creating
directory for rocksdb timer service.", e);
+ }
+
+ ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator())
+ .setCompactionStyle(CompactionStyle.UNIVERSAL);
+ ColumnFamilyDescriptor defaultColumnDescriptor = new
ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
+
+ DBOptions dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setUseFsync(false)
+ .setDisableDataSync(true)
+ .setMaxOpenFiles(-1);
+
+ List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
+
+ try {
+ this.db = RocksDB.open(dbOptions, dbPath.getPath(),
Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error while creating the
RocksDB instance.", e);
+ }
+
+ this.numPartitions =
Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
--- End diff --
I think we can safely initialize this to
`keyGroupRange.getNumberOfKeyGroups()`, which is required to be smaller than
the maximum max_parallelism. A preconditions check might be ok, though.
> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is
> HeapInternalTimerService which stores all timers in memory. In the cases
> where the number of keys is very large, the timer service will cost too much
> memory. A implementation which stores timers in RocksDB seems good to deal
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because
> the timers are accessed in different ways. When timers are triggered, we need
> to access timers in the order of timestamp. But when performing checkpoints,
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of
> merge sorting. We can store timers in RocksDB with the format
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put
> together and are sorted.
> Then we can deploy an in-memory heap which keeps the first timer of each key
> group to get the next timer to trigger. When a key group's first timer is
> updated, we can efficiently update the heap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)