This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push: new ac694375da feat: implement OperatorInputStream and OperatorOutputStream (#4626) ac694375da is described below commit ac694375da955ea6d3461aee0bfafd5a5d774541 Author: tison <wander4...@gmail.com> AuthorDate: Sun May 19 22:52:26 2024 +0800 feat: implement OperatorInputStream and OperatorOutputStream (#4626) Signed-off-by: tison <wander4...@gmail.com> --- bindings/java/pom.xml | 12 +++ bindings/java/src/blocking_operator.rs | 1 + bindings/java/src/lib.rs | 2 + .../java/org/apache/opendal/BlockingOperator.java | 28 ++++--- .../org/apache/opendal/OperatorInputStream.java | 71 ++++++++++++++++ .../org/apache/opendal/OperatorOutputStream.java | 82 +++++++++++++++++++ bindings/java/src/operator_input_stream.rs | 92 +++++++++++++++++++++ bindings/java/src/operator_output_stream.rs | 95 ++++++++++++++++++++++ .../test/OperatorInputOutputStreamTest.java | 66 +++++++++++++++ core/src/raw/ops.rs | 2 +- core/src/types/blocking_read/blocking_reader.rs | 2 +- core/src/types/blocking_read/std_bytes_iterator.rs | 26 ++++-- 12 files changed, 461 insertions(+), 18 deletions(-) diff --git a/bindings/java/pom.xml b/bindings/java/pom.xml index aec26bab55..52d77ee94f 100644 --- a/bindings/java/pom.xml +++ b/bindings/java/pom.xml @@ -67,6 +67,7 @@ <!-- library dependencies --> <assertj.version>3.23.1</assertj.version> + <commons-io.version>2.16.1</commons-io.version> <dotenv.version>2.3.2</dotenv.version> <lombok.version>1.18.30</lombok.version> <slf4j.version>2.0.7</slf4j.version> @@ -110,6 +111,12 @@ <artifactId>dotenv-java</artifactId> <version>${dotenv.version}</version> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons-io.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.httpcomponents.client5</groupId> <artifactId>httpclient5</artifactId> @@ -151,6 +158,11 @@ <artifactId>dotenv-java</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.httpcomponents.client5</groupId> <artifactId>httpclient5</artifactId> diff --git a/bindings/java/src/blocking_operator.rs b/bindings/java/src/blocking_operator.rs index 3234cf84fc..88d92d1a40 100644 --- a/bindings/java/src/blocking_operator.rs +++ b/bindings/java/src/blocking_operator.rs @@ -25,6 +25,7 @@ use jni::sys::jobject; use jni::sys::jobjectArray; use jni::sys::jsize; use jni::JNIEnv; + use opendal::BlockingOperator; use crate::convert::jstring_to_string; diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs index c9950a4796..3dde906222 100644 --- a/bindings/java/src/lib.rs +++ b/bindings/java/src/lib.rs @@ -38,6 +38,8 @@ mod error; mod executor; mod layer; mod operator; +mod operator_input_stream; +mod operator_output_stream; mod utility; pub(crate) type Result<T> = std::result::Result<T, error::Error>; diff --git a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java index ffb0b60420..833e6ac04b 100644 --- a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java +++ b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java @@ -69,10 +69,18 @@ public class BlockingOperator extends NativeObject { write(nativeHandle, path, content); } + public OperatorOutputStream createOutputStream(String path) { + return new OperatorOutputStream(this, path); + } + public byte[] read(String path) { return read(nativeHandle, path); } + public OperatorInputStream createInputStream(String path) { + return new OperatorInputStream(this, path); + } + public void delete(String path) { delete(nativeHandle, path); } @@ -104,23 +112,23 @@ public class BlockingOperator extends NativeObject { @Override protected native void disposeInternal(long handle); - private static native long duplicate(long nativeHandle); + private static native long duplicate(long op); - private static native void write(long nativeHandle, String path, byte[] content); + private static native void write(long op, String path, byte[] content); - private static native byte[] read(long nativeHandle, String path); + private static native byte[] read(long op, String path); - private static native void delete(long nativeHandle, String path); + private static native void delete(long op, String path); - private static native Metadata stat(long nativeHandle, String path); + private static native Metadata stat(long op, String path); - private static native long createDir(long nativeHandle, String path); + private static native long createDir(long op, String path); - private static native long copy(long nativeHandle, String sourcePath, String targetPath); + private static native long copy(long op, String sourcePath, String targetPath); - private static native long rename(long nativeHandle, String sourcePath, String targetPath); + private static native long rename(long op, String sourcePath, String targetPath); - private static native void removeAll(long nativeHandle, String path); + private static native void removeAll(long op, String path); - private static native Entry[] list(long nativeHandle, String path); + private static native Entry[] list(long op, String path); } diff --git a/bindings/java/src/main/java/org/apache/opendal/OperatorInputStream.java b/bindings/java/src/main/java/org/apache/opendal/OperatorInputStream.java new file mode 100644 index 0000000000..a9eb6731d8 --- /dev/null +++ b/bindings/java/src/main/java/org/apache/opendal/OperatorInputStream.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal; + +import java.io.IOException; +import java.io.InputStream; + +public class OperatorInputStream extends InputStream { + private static class Reader extends NativeObject { + private Reader(long nativeHandle) { + super(nativeHandle); + } + + @Override + protected void disposeInternal(long handle) { + disposeReader(handle); + } + } + + private final Reader reader; + + private int offset = 0; + private byte[] bytes = new byte[0]; + + public OperatorInputStream(BlockingOperator operator, String path) { + final long op = operator.nativeHandle; + this.reader = new Reader(constructReader(op, path)); + } + + @Override + public int read() throws IOException { + if (bytes != null && offset >= bytes.length) { + bytes = readNextBytes(reader.nativeHandle); + offset = 0; + } + + if (bytes != null) { + return bytes[offset++] & 0xFF; + } + + return -1; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private static native long constructReader(long op, String path); + + private static native long disposeReader(long reader); + + private static native byte[] readNextBytes(long reader); +} diff --git a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java new file mode 100644 index 0000000000..119346595e --- /dev/null +++ b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +public class OperatorOutputStream extends OutputStream { + private static class Writer extends NativeObject { + private Writer(long nativeHandle) { + super(nativeHandle); + } + + @Override + protected void disposeInternal(long handle) { + disposeWriter(handle); + } + } + + private static final int MAX_BYTES = 16384; + + private final Writer writer; + private final byte[] bytes = new byte[MAX_BYTES]; + + private int offset = 0; + + public OperatorOutputStream(BlockingOperator operator, String path) { + final long op = operator.nativeHandle; + this.writer = new Writer(constructWriter(op, path)); + } + + @Override + public void write(int b) throws IOException { + bytes[offset++] = (byte) b; + if (offset >= MAX_BYTES) { + flush(); + } + } + + @Override + public void flush() throws IOException { + if (offset > MAX_BYTES) { + throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES); + } else if (offset < MAX_BYTES) { + final byte[] bytes = Arrays.copyOf(this.bytes, offset); + writeBytes(writer.nativeHandle, bytes); + } else { + writeBytes(writer.nativeHandle, bytes); + } + offset = 0; + } + + @Override + public void close() throws IOException { + flush(); + writer.close(); + } + + private static native long constructWriter(long op, String path); + + private static native long disposeWriter(long writer); + + private static native byte[] writeBytes(long writer, byte[] bytes); +} diff --git a/bindings/java/src/operator_input_stream.rs b/bindings/java/src/operator_input_stream.rs new file mode 100644 index 0000000000..fd53237bf3 --- /dev/null +++ b/bindings/java/src/operator_input_stream.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::convert::jstring_to_string; +use jni::objects::{JByteArray, JClass, JObject, JString}; +use jni::sys::{jbyteArray, jlong}; +use jni::JNIEnv; +use opendal::{BlockingOperator, StdBytesIterator}; + +/// # Safety +/// +/// This function should not be called before the Operator is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_constructReader( + mut env: JNIEnv, + _: JClass, + op: *mut BlockingOperator, + path: JString, +) -> jlong { + intern_construct_reader(&mut env, &mut *op, path).unwrap_or_else(|e| { + e.throw(&mut env); + 0 + }) +} + +fn intern_construct_reader( + env: &mut JNIEnv, + op: &mut BlockingOperator, + path: JString, +) -> crate::Result<jlong> { + let path = jstring_to_string(env, &path)?; + let reader = op.reader(&path)?.into_bytes_iterator(..); + Ok(Box::into_raw(Box::new(reader)) as jlong) +} + +/// # Safety +/// +/// This function should not be called before the Operator is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_disposeReader( + _: JNIEnv, + _: JClass, + reader: *mut StdBytesIterator, +) { + drop(Box::from_raw(reader)); +} + +/// # Safety +/// +/// This function should not be called before the Operator is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_readNextBytes( + mut env: JNIEnv, + _: JClass, + reader: *mut StdBytesIterator, +) -> jbyteArray { + intern_read_next_bytes(&mut env, &mut *reader).unwrap_or_else(|e| { + e.throw(&mut env); + JByteArray::default().into_raw() + }) +} + +fn intern_read_next_bytes( + env: &mut JNIEnv, + reader: &mut StdBytesIterator, +) -> crate::Result<jbyteArray> { + match reader + .next() + .transpose() + .map_err(|err| opendal::Error::new(opendal::ErrorKind::Unexpected, &err.to_string()))? + { + None => Ok(JObject::null().into_raw()), + Some(content) => { + let result = env.byte_array_from_slice(&content)?; + Ok(result.into_raw()) + } + } +} diff --git a/bindings/java/src/operator_output_stream.rs b/bindings/java/src/operator_output_stream.rs new file mode 100644 index 0000000000..7478fe67af --- /dev/null +++ b/bindings/java/src/operator_output_stream.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use jni::objects::{JByteArray, JClass, JString}; +use jni::sys::jlong; +use jni::JNIEnv; + +use opendal::{BlockingOperator, BlockingWriter}; + +use crate::convert::jstring_to_string; + +/// # Safety +/// +/// This function should not be called before the Operator is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_OperatorOutputStream_constructWriter( + mut env: JNIEnv, + _: JClass, + op: *mut BlockingOperator, + path: JString, +) -> jlong { + intern_construct_write(&mut env, &mut *op, path).unwrap_or_else(|e| { + e.throw(&mut env); + 0 + }) +} + +fn intern_construct_write( + env: &mut JNIEnv, + op: &mut BlockingOperator, + path: JString, +) -> crate::Result<jlong> { + let path = jstring_to_string(env, &path)?; + let writer = op.writer(&path)?; + Ok(Box::into_raw(Box::new(writer)) as jlong) +} + +/// # Safety +/// +/// This function should not be called before the Operator is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_OperatorOutputStream_disposeWriter( + mut env: JNIEnv, + _: JClass, + writer: *mut BlockingWriter, +) { + let mut writer = Box::from_raw(writer); + intern_dispose_write(&mut writer).unwrap_or_else(|e| { + e.throw(&mut env); + }) +} + +fn intern_dispose_write(writer: &mut BlockingWriter) -> crate::Result<()> { + writer.close()?; + Ok(()) +} + +/// # Safety +/// +/// This function should not be called before the Operator is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_OperatorOutputStream_writeBytes( + mut env: JNIEnv, + _: JClass, + writer: *mut BlockingWriter, + content: JByteArray, +) { + intern_write_bytes(&mut env, &mut *writer, content).unwrap_or_else(|e| { + e.throw(&mut env); + }) +} + +fn intern_write_bytes( + env: &mut JNIEnv, + writer: &mut BlockingWriter, + content: JByteArray, +) -> crate::Result<()> { + let content = env.convert_byte_array(content)?; + writer.write(content)?; + Ok(()) +} diff --git a/bindings/java/src/test/java/org/apache/opendal/test/OperatorInputOutputStreamTest.java b/bindings/java/src/test/java/org/apache/opendal/test/OperatorInputOutputStreamTest.java new file mode 100644 index 0000000000..8cacbe57ff --- /dev/null +++ b/bindings/java/src/test/java/org/apache/opendal/test/OperatorInputOutputStreamTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal.test; + +import static org.assertj.core.api.Assertions.assertThat; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import org.apache.opendal.BlockingOperator; +import org.apache.opendal.OperatorInputStream; +import org.apache.opendal.OperatorOutputStream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class OperatorInputOutputStreamTest { + @TempDir + private static Path tempDir; + + @Test + void testReadWriteWithStream() throws Exception { + final Map<String, String> conf = new HashMap<>(); + conf.put("root", tempDir.toString()); + + try (final BlockingOperator op = BlockingOperator.of("fs", conf)) { + final String path = "OperatorInputOutputStreamTest.txt"; + final long multi = 1024 * 1024; + + try (final OperatorOutputStream os = op.createOutputStream(path)) { + for (long i = 0; i < multi; i++) { + os.write("[content] OperatorInputStreamTest\n".getBytes()); + } + } + + try (final OperatorInputStream is = op.createInputStream(path)) { + final Stream<String> lines = new BufferedReader(new InputStreamReader(is)).lines(); + final AtomicLong count = new AtomicLong(); + lines.forEach((line) -> { + assertThat(line).isEqualTo("[content] OperatorInputStreamTest"); + count.incrementAndGet(); + }); + assertThat(count.get()).isEqualTo(multi); + } + } + } +} diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index f21c2769fb..2c4b494931 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -387,7 +387,7 @@ impl OpRead { pub struct OpReader { /// The range of the read request. /// - /// Not available for `reader``. + /// Not available for `reader`. range: BytesRange, /// The concurrent requests that reader can send. concurrent: usize, diff --git a/core/src/types/blocking_read/blocking_reader.rs b/core/src/types/blocking_read/blocking_reader.rs index e87528fcea..6dfedf7b1d 100644 --- a/core/src/types/blocking_read/blocking_reader.rs +++ b/core/src/types/blocking_read/blocking_reader.rs @@ -151,7 +151,7 @@ impl BlockingReader { /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`]. #[inline] - pub fn into_bytes_iterator(self, range: Range<u64>) -> StdBytesIterator { + pub fn into_bytes_iterator(self, range: impl RangeBounds<u64>) -> StdBytesIterator { StdBytesIterator::new(self.inner, range) } } diff --git a/core/src/types/blocking_read/std_bytes_iterator.rs b/core/src/types/blocking_read/std_bytes_iterator.rs index dba05ae734..7f8752a1c6 100644 --- a/core/src/types/blocking_read/std_bytes_iterator.rs +++ b/core/src/types/blocking_read/std_bytes_iterator.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::collections::Bound; use std::io; +use std::ops::RangeBounds; use bytes::Buf; use bytes::Bytes; @@ -30,7 +32,7 @@ use crate::raw::*; pub struct StdBytesIterator { inner: oio::BlockingReader, offset: u64, - size: u64, + end: u64, cap: usize, cur: u64, @@ -39,11 +41,23 @@ pub struct StdBytesIterator { impl StdBytesIterator { /// NOTE: don't allow users to create StdIterator directly. #[inline] - pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range<u64>) -> Self { + pub(crate) fn new(r: oio::BlockingReader, range: impl RangeBounds<u64>) -> Self { + let start = match range.start_bound().cloned() { + Bound::Included(start) => start, + Bound::Excluded(start) => start + 1, + Bound::Unbounded => 0, + }; + + let end = match range.end_bound().cloned() { + Bound::Included(end) => Some(end + 1), + Bound::Excluded(end) => Some(end), + Bound::Unbounded => None, + }; + StdBytesIterator { inner: r, - offset: range.start, - size: range.end - range.start, + offset: start, + end: end.unwrap_or(u64::MAX), // TODO: should use services preferred io size. cap: 4 * 1024 * 1024, cur: 0, @@ -61,12 +75,12 @@ impl Iterator for StdBytesIterator { type Item = io::Result<Bytes>; fn next(&mut self) -> Option<Self::Item> { - if self.cur >= self.size { + if self.offset + self.cur >= self.end { return None; } let next_offset = self.offset + self.cur; - let next_size = (self.size - self.cur).min(self.cap as u64) as usize; + let next_size = (self.end - self.offset).min(self.cap as u64) as usize; match self.inner.read_at(next_offset, next_size) { Ok(buf) if !buf.has_remaining() => None, Ok(mut buf) => {