carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317895515
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java
 ##########
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities to get/put data to {@link ByteBuffer}. All methods don't change
+ * byte buffer's position.
+ */
+@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"})
+public class ByteBufferUtils {
+
+       private static final boolean UNSAFE_AVAIL = UnsafeHelp.isAvailable();
+       private static final boolean UNSAFE_UNALIGNED = UnsafeHelp.unaligned();
+       private static final Field ACCESS_FIELD;
+
+       static {
+               try {
+                       ACCESS_FIELD = 
java.nio.Buffer.class.getDeclaredField("address");
+                       ACCESS_FIELD.setAccessible(true);
+               } catch (NoSuchFieldException e) {
+                       throw new RuntimeException("Failed to get address 
method from java.nio.Buffer", e);
+               }
+       }
+
+       /**
+        * Reads an int value at the given buffer's offset.
+        *
+        * @param buffer the given buffer
+        * @param offset the given buffer's offset
+        * @return int value at offset
+        */
+       public static int toInt(ByteBuffer buffer, int offset) {
+               if (UNSAFE_UNALIGNED) {
+                       return UnsafeHelp.toInt(buffer, offset);
+               } else {
+                       return buffer.getInt(offset);
+               }
+       }
+
+       /**
+        * Reads a long value at the given buffer's offset.
+        *
+        * @param buffer the given buffer
+        * @param offset the given buffer's offset
+        * @return long value at offset
+        */
+       public static long toLong(ByteBuffer buffer, int offset) {
+               if (UNSAFE_UNALIGNED) {
+                       return UnsafeHelp.toLong(buffer, offset);
+               } else {
+                       return buffer.getLong(offset);
+               }
+       }
+
+       /**
+        * Reads a short value at the given buffer's offset.
+        *
+        * @param buffer the given buffer
+        * @param offset the given buffer's offset
+        * @return short value at offset
+        */
+       public static short toShort(ByteBuffer buffer, int offset) {
+               if (UNSAFE_UNALIGNED) {
+                       return UnsafeHelp.toShort(buffer, offset);
+               } else {
+                       return buffer.getShort(offset);
+               }
+       }
+
+       public static byte toByte(ByteBuffer buffer, int offset) {
+               if (UnsafeHelp.isAvailable()) {
+                       return UnsafeHelp.toByte(buffer, offset);
+               } else {
+                       return buffer.get(offset);
+               }
+       }
+
+       public static void putInt(ByteBuffer buffer, int index, int val) {
+               if (UNSAFE_UNALIGNED) {
+                       UnsafeHelp.putInt(buffer, index, val);
+               } else {
+                       buffer.putInt(index, val);
+               }
+       }
+
+       public static void putLong(ByteBuffer buffer, int index, long val) {
+               if (UNSAFE_UNALIGNED) {
+                       UnsafeHelp.putLong(buffer, index, val);
+               } else {
+                       buffer.putLong(index, val);
+               }
+       }
+
+       /**
+        * Copy from one buffer to another from given offset. This will be 
absolute positional copying and
+        * won't affect the position of any of the buffers.
+        *
+        * @param in                the given buffer to read
+        * @param out               the given buffer of destination
+        * @param sourceOffset      the given buffer's offset of src
+        * @param destinationOffset the given buffer's offset of destination
+        * @param length            indicate data length
+        */
+       public static int copyFromBufferToBuffer(
 
 Review comment:
   The returned value is not needed and will change the method signature to 
remove it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to