carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325712863
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
 ##########
 @@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for skip list.
+ */
+@SuppressWarnings("WeakerAccess")
+public class SkipListUtils {
+       static final long NIL_NODE = -1;
+       static final long HEAD_NODE = -2;
+       static final long NIL_VALUE_POINTER = -1;
+       static final int MAX_LEVEL = 255;
+       static final int DEFAULT_LEVEL = 32;
+       static final int BYTE_MASK = 0xFF;
+
+       /**
+        * Key space schema.
+        * - key meta
+        * -- int: level & status
+        * --   byte 0: level of node in skip list
+        * --   byte 1: node status
+        * --   byte 2: preserve
+        * --   byte 3: preserve
+        * -- int: length of key
+        * -- long: pointer to the newest value
+        * -- long: pointer to next node on level 0
+        * -- long[]: array of pointers to next node on different levels 
excluding level 0
+        * -- long[]: array of pointers to previous node on different levels 
excluding level 0
+        * - byte[]: data of key
+        */
+       static final int KEY_META_OFFSET = 0;
+       static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES;
+       static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES;
+       static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + 
Long.BYTES;
+       static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + 
Long.BYTES;
+
+
+       /**
+        * Pre-compute the offset of index for different levels to dismiss the 
duplicated
+        * computation at runtime.
+        */
+       private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+       /**
+        * Pre-compute the length of key meta for different levels to dismiss 
the duplicated
+        * computation at runtime.
+        */
+       private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+       static {
+               for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; 
i++) {
+                       INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = 
LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES;
+               }
+
+               for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) {
+                       KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 
* i * Long.BYTES;
+               }
+       }
+
+       /**
+        * Returns the level of the node.
+        *
+        * @param byteBuffer byte buffer for key space.
+        * @param offset offset of key space in the byte buffer.
+        */
+       public static int getLevel(ByteBuffer byteBuffer, int offset) {
+               return ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) & BYTE_MASK;
+       }
+
+       /**
+        * Returns the status of the node.
+        *
+        * @param byteBuffer byte buffer for key space.
+        * @param offset offset of key space in the byte buffer.
+        */
+       public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) {
 
 Review comment:
   We thought adding a `valueOf` in `NodeStatus` and do switch of would have a 
worse performance, but after a JMH check there's no big difference between 
"firstly valueOf and then comparing the enum" and "pass the byte and compare 
with getValue", so will change to use the more readable way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to