zhoufek commented on a change in pull request #15505:
URL: https://github.com/apache/beam/pull/15505#discussion_r713089109



##########
File path: 
sdks/java/extensions/sbe/src/main/java/org/apache/beam/sdk/extensions/sbe/DirectByteBuffer.java
##########
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sbe;
+
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.errorprone.annotations.DoNotCall;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import javax.annotation.Nonnull;
+import org.agrona.DirectBuffer;
+import org.agrona.MutableDirectBuffer;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A default implementation for {@link DirectBuffer} that is designed for use 
in Beam types.
+ *
+ * <p>This fully implements reading data. Since data cannot be modified, it 
makes no attempt to
+ * synchronize threads. If a concrete class needs to also implement {@link 
MutableDirectBuffer},
+ * guaranteeing thread-safety is the responsibility of that class.
+ *
+ * <p>To better guarantee PCollection safety, {@link DirectByteBuffer} does 
not implement the
+ * following {@link DirectBuffer} methods:
+ *
+ * <ul>
+ *   <li>Any {@code wrap} method.
+ *   <li>{@link DirectBuffer#addressOffset()}
+ *   <li>{@link DirectBuffer#wrapAdjustment()}
+ *   <li>{@link DirectBuffer#byteArray()}
+ *   <li>{@link DirectBuffer#byteBuffer()}
+ * </ul>
+ *
+ * <p>The last two can still be gotten via copy rather than by accessing the 
underlying buffer
+ * directly.
+ *
+ * <p>Instances can be created in either copy mode or view mode. Copy mode 
will copy any passed-in
+ * data to an internal buffer, and view mode will be backed directly by the 
passed-in data.
+ * Regardless of the mode, the 0 index of all reads and writes will be the 
{@link
+ * ByteBuffer#position()} value at the time of instance creation. This offset 
will not change even
+ * if the position is changed externally while in view mode. Reads are allowed 
in the range [0,
+ * limit), and writes are allowed in the range [0, capacity) regardless of 
mode. The limit can
+ * change if the implementation allows writing. The capacity is always the 
same as the passed-in
+ * {@link ByteBuffer}.
+ *
+ * <p>Implementations should use {@link DirectByteBuffer#DEFAULT_BYTE_ORDER} 
to determine the order
+ * of bytes. If a passed-in {@link ByteOrder} is different, then the bytes 
should be reversed before
+ * writes or after reads.
+ *
+ * <p>Implementations should implement {@link Object#equals(Object)} and 
{@link Object#hashCode()}.
+ * Implementations are required to implement {@link 
Comparable#compareTo(Object)}.
+ */
+@Experimental(Kind.EXTENSION)
+public abstract class DirectByteBuffer implements DirectBuffer {
+
+  protected static final String UNSAFE_FOR_PCOLLECTION = "Unsafe for 
PCollection";
+
+  /** Order of bytes in {@link DirectByteBuffer#buffer}. */
+  protected static final ByteOrder DEFAULT_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+  @Nonnull protected final ByteBuffer buffer;
+  protected final int offset;
+  protected int length; // Length might change if implementation allows 
writing to the buffer
+  protected final int capacity;
+
+  /**
+   * Creates a new instance that handles {@code buffer} according to {@code 
mode}.
+   *
+   * @param buffer the {@link ByteBuffer} to use to set the underlying data
+   * @param mode how {@code buffer} should be used to set the underlying data
+   */
+  protected DirectByteBuffer(@Nonnull ByteBuffer buffer, CreateMode mode) {
+    this.length = buffer.limit() - buffer.position();
+    if (mode == CreateMode.COPY) {
+      this.buffer = createCopyOfByteBuffer(buffer, buffer.position(), 
this.length);
+      this.offset = 0;
+    } else {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to