gianm commented on code in PR #12848: URL: https://github.com/apache/druid/pull/12848#discussion_r936854874
########## processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; + +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * Channel backed by a byte stream that is continuously streamed in using {@link #addChunk}. The byte stream + * must be in the format of a {@link org.apache.druid.frame.file.FrameFile}. + * + * This class is used by {@link org.apache.druid.frame.file.FrameFileHttpResponseHandler} to provide nonblocking + * reads from a remote http server. + */ +public class ReadableByteChunksFrameChannel implements ReadableFrameChannel +{ + private static final Logger log = new Logger(ReadableByteChunksFrameChannel.class); + + /** + * Largest supported frame. Limit exists as a safeguard against streams with huge frame sizes. It is not expected + * that any legitimate frame will be this large: typical usage involves frames an order of magnitude smaller. + */ + private static final long MAX_FRAME_SIZE = 100_000_000; + + private static final int UNKNOWN_LENGTH = -1; + private static final int FRAME_MARKER_BYTES = Byte.BYTES; + private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES = + Byte.BYTES + Frame.COMPRESSED_FRAME_ENVELOPE_SIZE; + + private enum StreamPart + { + MAGIC, + FRAMES, + FOOTER + } + + private final Object lock = new Object(); + private final String id; + private final long bytesLimit; + + @GuardedBy("lock") + private final List<Either<Throwable, byte[]>> chunks = new ArrayList<>(); + + @GuardedBy("lock") + private SettableFuture<?> addChunkBackpressureFuture = null; + + @GuardedBy("lock") + private SettableFuture<?> readyForReadingFuture = null; + + @GuardedBy("lock") + private boolean noMoreWrites = false; + + @GuardedBy("lock") + private int positionInFirstChunk = 0; + + @GuardedBy("lock") + private long bytesBuffered = 0; + + @GuardedBy("lock") + private long bytesAdded = 0; + + @GuardedBy("lock") + private long nextCompressedFrameLength = UNKNOWN_LENGTH; + + @GuardedBy("lock") + private StreamPart streamPart = StreamPart.MAGIC; + + private ReadableByteChunksFrameChannel(String id, long bytesLimit) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.bytesLimit = bytesLimit; + } + + /** + * Create a channel that aims to limit its memory footprint to one frame. The channel exerts backpressure + * from {@link #addChunk} immediately once a full frame has been buffered. + */ + public static ReadableByteChunksFrameChannel create(final String id) + { + // Set byte limit to 1, so backpressure will be exerted as soon as we have a full frame buffered. + // (The bytesLimit is soft: it will be exceeded if needed to store a complete frame.) + return new ReadableByteChunksFrameChannel(id, 1); + } + + /** + * Adds a chunk of bytes. If this chunk forms a full frame, it will immediately become available for reading. + * Otherwise, the bytes will be buffered until a full frame is encountered. + * + * Returns an Optional that is absent if the amount of queued bytes is below this channel's limit accept, or present + * if the amount of queued bytes is at or above this channel's limit. If the Optional is present, you are politely + * requested to wait for the future to resolve before adding additional chunks. (This is not enforced; addChunk will + * continue to accept new chunks even if the channel is over its limit.) + * + * When done adding chunks call {@code doneWriting}. + */ + public Optional<ListenableFuture<?>> addChunk(final byte[] chunk) Review Comment: I figured that Optional was fine since the chunks are expected to be biggish and so the cost of Optional allocations are amortized. That being said, I'll change it to use `null` anyway, for a couple reasons: - Chunks might be small enough that it matters: they aren't necessarily full frames. - Optionals aren't used much in this package; `null` is more common. So it's more harmonious with the rest of the package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
