[
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930165#comment-15930165
]
ASF GitHub Bot commented on FLINK-5544:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3359#discussion_r106675210
--- Diff:
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService<K, N> extends
InternalTimerService<K, N> {
+
+ private static Logger LOG =
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+
+ /** The data base where stores all timers */
+ private final RocksDB db;
+
+ /** The path where the rocksdb locates */
+ private final Path dbPath;
+
+ /**
+ * The in-memory heaps backed by rocksdb to retrieve the next timer to
trigger. Each
+ * partition's leader is stored in the heap. When the timers in a
partition is changed, we
+ * will change the partition's leader and update the heap accordingly.
+ */
+ private final int numPartitions;
+ private final PersistentTimerHeap eventTimeHeap;
+ private final PersistentTimerHeap processingTimeHeap;
+
+ private static int MAX_PARTITIONS = (1 << 16);
+
+ public RocksDBInternalTimerService(
+ int totalKeyGroups,
+ KeyGroupRange keyGroupRange,
+ KeyContext keyContext,
+ ProcessingTimeService processingTimeService,
+ Path dbPath) {
+
+ super(totalKeyGroups, keyGroupRange, keyContext,
processingTimeService);
+
+ this.dbPath = dbPath;
+
+ try {
+ FileSystem fileSystem = this.dbPath.getFileSystem();
+ if (fileSystem.exists(this.dbPath)) {
+ fileSystem.delete(this.dbPath, true);
+ }
+
+ fileSystem.mkdirs(dbPath);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while creating
directory for rocksdb timer service.", e);
+ }
+
+ ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator())
+ .setCompactionStyle(CompactionStyle.UNIVERSAL);
+ ColumnFamilyDescriptor defaultColumnDescriptor = new
ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
+
+ DBOptions dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setUseFsync(false)
+ .setDisableDataSync(true)
+ .setMaxOpenFiles(-1);
+
+ List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
+
+ try {
+ this.db = RocksDB.open(dbOptions, dbPath.getPath(),
Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error while creating the
RocksDB instance.", e);
+ }
+
+ this.numPartitions =
Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
+
+ ColumnFamilyHandle eventTimeColumnFamilyHandle;
+ ColumnFamilyHandle processingTimeColumnFamilyHandle;
+ try {
+ ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor
= new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
+ ColumnFamilyDescriptor
processingTimeColumnFamilyDescriptor = new
ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
+ eventTimeColumnFamilyHandle =
db.createColumnFamily(eventTimeColumnFamilyDescriptor);
+ processingTimeColumnFamilyHandle =
db.createColumnFamily(processingTimeColumnFamilyDescriptor);
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error while creating the
column families.", e);
+ }
+
+ this.processingTimeHeap = new
PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
+ this.eventTimeHeap = new PersistentTimerHeap(numPartitions,
eventTimeColumnFamilyHandle);
+ }
+
+ //
------------------------------------------------------------------------
+ // InternalTimerService Implementation
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void start() {
+ // rebuild the heaps
+ eventTimeHeap.initialize();
+ processingTimeHeap.initialize();
+
+ // register the processing timer with the minimum timestamp
+ Tuple4<Integer, Long, K, N> headProcessingTimer =
processingTimeHeap.top();
+ if (headProcessingTimer != null) {
+ nextTimer =
processingTimeService.registerTimer(headProcessingTimer.f1, this);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (db != null) {
+ db.close();
+ }
+
+ if (dbPath != null) {
+ try {
+ FileSystem fileSystem = dbPath.getFileSystem();
+ if (fileSystem.exists(dbPath)) {
+ fileSystem.delete(dbPath, true);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error while
cleaning directory for rocksdb timer service.", e);
+ }
+ }
+ }
+
+ @Override
+ public void onEventTime(long timestamp) throws Exception {
+ List<Tuple4<Integer, Long, K, N>> timers =
eventTimeHeap.peek(timestamp);
+ for (Tuple4<Integer, Long, K, N> timer : timers) {
+ keyContext.setCurrentKey(timer.f2);
+ triggerTarget.onEventTime(new InternalTimer<>(timer.f1,
timer.f2, timer.f3));
+ }
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ nextTimer = null;
+
+ List<Tuple4<Integer, Long, K, N>> timers =
processingTimeHeap.peek(timestamp);
+ for (Tuple4<Integer, Long, K, N> timer : timers) {
+ keyContext.setCurrentKey(timer.f2);
+ triggerTarget.onProcessingTime(new
InternalTimer<>(timer.f1, timer.f2, timer.f3));
+ }
+
+ if (nextTimer == null) {
+ Tuple4<Integer, Long, K, N> headTimer =
processingTimeHeap.top();
+ if (headTimer != null) {
+ nextTimer =
processingTimeService.registerTimer(headTimer.f1, this);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void registerProcessingTimeTimer(N namespace, long time) {
+ boolean isNewHead =
processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
+
+ if (isNewHead) {
+ if (nextTimer != null) {
+ nextTimer.cancel(false);
+ }
+
+ Tuple4<Integer, Long, K, N> newHeadTimer =
processingTimeHeap.top();
+ if (newHeadTimer == null || newHeadTimer.f1 != time) {
+ throw new IllegalStateException();
--- End diff --
The exception should contain a message about what went wrong. This should
also be fixed in similar cases in this class.
> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is
> HeapInternalTimerService which stores all timers in memory. In the cases
> where the number of keys is very large, the timer service will cost too much
> memory. A implementation which stores timers in RocksDB seems good to deal
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because
> the timers are accessed in different ways. When timers are triggered, we need
> to access timers in the order of timestamp. But when performing checkpoints,
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of
> merge sorting. We can store timers in RocksDB with the format
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put
> together and are sorted.
> Then we can deploy an in-memory heap which keeps the first timer of each key
> group to get the next timer to trigger. When a key group's first timer is
> updated, we can efficiently update the heap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)