This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e013c67f24 Aliyun: Remove spring-boot dependency (#11291)
e013c67f24 is described below
commit e013c67f24f0c0c256dd6a0dfeb872b7959ac267
Author: JB Onofré <[email protected]>
AuthorDate: Mon Oct 28 18:42:08 2024 +0100
Aliyun: Remove spring-boot dependency (#11291)
---
.../org/apache/iceberg/aliyun/TestUtility.java | 2 +-
.../iceberg/aliyun/oss/TestOSSOutputStream.java | 2 +-
.../iceberg/aliyun/oss/mock/AliyunOSSMock.java | 569 +++++++++++++++++++++
.../iceberg/aliyun/oss/mock/AliyunOSSMockApp.java | 158 ------
.../aliyun/oss/mock/AliyunOSSMockExtension.java | 23 +-
.../oss/mock/AliyunOSSMockLocalController.java | 522 -------------------
.../aliyun/oss/mock/AliyunOSSMockLocalStore.java | 14 +-
.../org/apache/iceberg/aliyun/oss/mock/Range.java | 43 --
build.gradle | 10 -
gradle/libs.versions.toml | 5 -
10 files changed, 587 insertions(+), 761 deletions(-)
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestUtility.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestUtility.java
index 072886f6b8..430eb6a50b 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestUtility.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestUtility.java
@@ -65,7 +65,7 @@ public class TestUtility {
} else {
LOG.info(
"Initializing AliyunOSSExtension implementation with default
AliyunOSSMockExtension");
- extension = AliyunOSSMockExtension.builder().silent().build();
+ extension = AliyunOSSMockExtension.builder().build();
}
return extension;
diff --git
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
index 8fc661e5be..9a7b774b28 100644
---
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
+++
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
@@ -70,7 +70,7 @@ public class TestOSSOutputStream extends AliyunOSSTestBase {
reset(ossMock);
// Write large file.
- writeAndVerify(ossMock, uri, randomData(32 * 1024 * 1024), arrayWrite);
+ writeAndVerify(ossMock, uri, randomData(32 * 1024), arrayWrite);
verify(ossMock, times(1)).putObject(any());
reset(ossMock);
}
diff --git
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java
new file mode 100644
index 0000000000..7894c1857d
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aliyun.oss.mock;
+
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.model.Bucket;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.FileInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
+
+public class AliyunOSSMock {
+
+ static final String PROP_ROOT_DIR = "root-dir";
+ static final String ROOT_DIR_DEFAULT = "/tmp";
+
+ static final String PROP_HTTP_PORT = "server.port";
+ static final int PORT_HTTP_PORT_DEFAULT = 9393;
+
+ private final AliyunOSSMockLocalStore localStore;
+ private final HttpServer httpServer;
+
+ public static AliyunOSSMock start(Map<String, Object> properties) throws
IOException {
+ AliyunOSSMock mock =
+ new AliyunOSSMock(
+ properties.getOrDefault(PROP_ROOT_DIR,
ROOT_DIR_DEFAULT).toString(),
+ Integer.parseInt(
+ properties.getOrDefault(PROP_HTTP_PORT,
PORT_HTTP_PORT_DEFAULT).toString()));
+ mock.start();
+ return mock;
+ }
+
+ private AliyunOSSMock(String rootDir, int serverPort) throws IOException {
+ localStore = new AliyunOSSMockLocalStore(rootDir);
+ httpServer = HttpServer.create(new InetSocketAddress("localhost",
serverPort), 0);
+ }
+
+ private void start() {
+ httpServer.createContext("/", new AliyunHttpHandler());
+ httpServer.start();
+ }
+
+ public void stop() {
+ httpServer.stop(0);
+ }
+
+ private class AliyunHttpHandler implements HttpHandler {
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String request = httpExchange.getRequestURI().getPath().substring(1);
+ String[] requests = request.split("/");
+ String bucketName = requests[0];
+ if (requests.length == 1) {
+ // bucket operations
+ if (httpExchange.getRequestMethod().equals("PUT")) {
+ putBucket(bucketName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("DELETE")) {
+ deleteBucket(bucketName, httpExchange);
+ }
+ } else {
+ // object operations
+ String objectName = requests[1];
+ if (objectName.contains("?")) {
+ objectName = objectName.substring(0, objectName.indexOf("?"));
+ }
+ if (httpExchange.getRequestMethod().equals("PUT")) {
+ putObject(bucketName, objectName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("DELETE")) {
+ deleteObject(bucketName, objectName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("HEAD")) {
+ getObjectMeta(bucketName, objectName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("GET")) {
+ getObject(bucketName, objectName, httpExchange);
+ }
+ }
+ }
+
+ private void putBucket(String bucketName, HttpExchange httpExchange)
throws IOException {
+ if (localStore.getBucket(bucketName) != null) {
+ String errorMessage =
+ createErrorResponse(
+ OSSErrorCode.BUCKET_ALREADY_EXISTS, bucketName + " already
exists.");
+ handleResponse(httpExchange, 409, errorMessage, "application/xml");
+ return;
+ }
+ localStore.createBucket(bucketName);
+ handleResponse(httpExchange, 200, "OK", "application/xml");
+ }
+
+ private void deleteBucket(String bucketName, HttpExchange httpExchange)
throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+ try {
+ localStore.deleteBucket(bucketName);
+ } catch (Exception e) {
+ String errorMessage =
+ createErrorResponse(
+ OSSErrorCode.BUCKET_NOT_EMPTY, "The bucket you tried to delete
is not empty.");
+ handleResponse(httpExchange, 409, errorMessage, "application/xml");
+ }
+ handleResponse(httpExchange, 200, "OK", "application/xml");
+ }
+
+ private void putObject(String bucketName, String objectName, HttpExchange
httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+
+ try (InputStream inputStream = httpExchange.getRequestBody()) {
+ ObjectMetadata metadata =
+ localStore.putObject(
+ bucketName,
+ objectName,
+ inputStream,
+ httpExchange.getRequestHeaders().getFirst("Content-Type"),
+ httpExchange.getRequestHeaders().getFirst("Content-Headers"),
+ ImmutableMap.of());
+
+ httpExchange.getResponseHeaders().add("ETag",
metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified",
createDate(metadata.getLastModificationDate()));
+ handleResponse(httpExchange, 200, "OK", "text/plain");
+ } catch (Exception e) {
+ handleResponse(httpExchange, 500, "Internal Server Error",
"text/plain");
+ }
+ }
+
+ private void deleteObject(String bucketName, String objectName,
HttpExchange httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+ localStore.deleteObject(bucketName, objectName);
+
+ handleResponse(httpExchange, 200, "OK", "text/plain");
+ }
+
+ private void getObjectMeta(String bucketName, String objectName,
HttpExchange httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+ ObjectMetadata metadata = verifyObjectExistence(bucketName, objectName);
+
+ if (metadata == null) {
+ String errorMessage =
+ createErrorResponse(OSSErrorCode.NO_SUCH_KEY, "The specify oss key
does not exists.");
+ handleResponse(httpExchange, 404, errorMessage, "application/xml");
+ } else {
+ httpExchange.getResponseHeaders().add("ETag",
metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified",
createDate(metadata.getLastModificationDate()));
+ httpExchange
+ .getResponseHeaders()
+ .add("Content-Length", Long.toString(metadata.getContentLength()));
+
+ handleResponse(httpExchange, 200, "OK", "text/plain");
+ }
+ }
+
+ private void getObject(String bucketName, String objectName, HttpExchange
httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+
+ String filename = objectName;
+ ObjectMetadata metadata = verifyObjectExistence(bucketName, filename);
+
+ if (metadata == null) {
+ String errorMessage =
+ createErrorResponse(OSSErrorCode.NO_SUCH_KEY, "The specify oss key
does not exists.");
+ handleResponse(httpExchange, 404, errorMessage, "application/xml");
+ return;
+ }
+
+ Object range = httpExchange.getRequestHeaders().get("Range");
+ if (range != null) {
+ range = range.toString().replace("[bytes=", "").replace("]", "");
+ String[] ranges = range.toString().split("-");
+ long rangeStart = -1;
+ if (!ranges[0].isEmpty()) {
+ rangeStart = Long.parseLong(ranges[0]);
+ }
+ long rangeEnd = -1;
+ if (ranges.length == 2 && !ranges[1].isEmpty()) {
+ rangeEnd = Long.parseLong(ranges[1]);
+ }
+ if (rangeEnd == -1) {
+ rangeEnd = Long.MAX_VALUE;
+ if (rangeStart == -1) {
+ rangeStart = 0;
+ }
+ }
+
+ long fileSize = metadata.getContentLength();
+ long bytesToRead = Math.min(fileSize - 1, rangeEnd) - rangeStart + 1;
+ long skipSize = rangeStart;
+ if (rangeStart == -1) {
+ bytesToRead = Math.min(fileSize - 1, rangeEnd);
+ skipSize = fileSize - rangeEnd;
+ }
+ if (rangeEnd == -1) {
+ bytesToRead = fileSize - rangeStart;
+ }
+ if (bytesToRead < 0 || fileSize < rangeStart) {
+ httpExchange.sendResponseHeaders(416, 1);
+ return;
+ }
+
+ httpExchange.getResponseHeaders().add("Accept-Ranges", "bytes");
+ httpExchange
+ .getResponseHeaders()
+ .add(
+ "Content-Range",
+ "bytes "
+ + rangeStart
+ + "-"
+ + (bytesToRead + rangeStart + 1)
+ + "/"
+ + metadata.getContentLength());
+ httpExchange.getResponseHeaders().add("ETag",
metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified",
createDate(metadata.getLastModificationDate()));
+ httpExchange.getResponseHeaders().add("Content-Type",
metadata.getContentType());
+ httpExchange.getResponseHeaders().add("Content-Length",
Long.toString(bytesToRead));
+ httpExchange.sendResponseHeaders(206, bytesToRead);
+ try (OutputStream outputStream = httpExchange.getResponseBody()) {
+ try (FileInputStream fis = new
FileInputStream(metadata.getDataFile())) {
+ fis.skip(skipSize);
+ ByteStreams.copy(new BoundedInputStream(fis, bytesToRead),
outputStream);
+ }
+ }
+ } else {
+ httpExchange.getResponseHeaders().add("Accept-Ranges", "bytes");
+ httpExchange.getResponseHeaders().add("ETag",
metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified",
createDate(metadata.getLastModificationDate()));
+ httpExchange.getResponseHeaders().add("Content-Type",
metadata.getContentType());
+ httpExchange.sendResponseHeaders(200, metadata.getContentLength());
+
+ try (OutputStream outputStream = httpExchange.getResponseBody()) {
+ try (FileInputStream fis = new
FileInputStream(metadata.getDataFile())) {
+ ByteStreams.copy(fis, outputStream);
+ }
+ }
+ }
+ }
+
+ private void verifyBucketExistence(String bucketName, HttpExchange
httpExchange)
+ throws IOException {
+ Bucket bucket = localStore.getBucket(bucketName);
+ if (bucket == null) {
+ String errorMessage =
+ createErrorResponse(
+ OSSErrorCode.NO_SUCH_BUCKET, "The specified bucket does not
exist.");
+ handleResponse(httpExchange, 404, errorMessage, "application/xml");
+ }
+ }
+
+ private ObjectMetadata verifyObjectExistence(String bucketName, String
fileName) {
+ ObjectMetadata objectMetadata = null;
+ try {
+ objectMetadata = localStore.getObjectMetadata(bucketName, fileName);
+ } catch (IOException e) {
+ // no-op
+ }
+
+ return objectMetadata;
+ }
+
+ private void handleResponse(
+ HttpExchange httpExchange, int responseCode, String responsePayload,
String contentType)
+ throws IOException {
+ OutputStream outputStream = httpExchange.getResponseBody();
+ httpExchange.getResponseHeaders().put("Content-Type",
Collections.singletonList(contentType));
+ httpExchange.sendResponseHeaders(responseCode, responsePayload.length());
+ outputStream.write(responsePayload.getBytes());
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ private String createErrorResponse(String errorCode, String message) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("<Error>");
+ builder.append("<Code>").append(errorCode).append("</Code>");
+ builder.append("<Message>").append(message).append("</Message>");
+ builder.append("</Error>");
+ return builder.toString();
+ }
+
+ private String createDate(long timestamp) {
+ java.util.Date date = new java.util.Date(timestamp);
+ ZonedDateTime dateTime = date.toInstant().atZone(ZoneId.of("GMT"));
+ return dateTime.format(DateTimeFormatter.RFC_1123_DATE_TIME);
+ }
+ }
+
+ /**
+ * Reads bytes up to a maximum length, if its count goes above that, it
stops.
+ *
+ * <p>This is useful to wrap ServletInputStreams. The ServletInputStream
will block if you try to
+ * read content from it that isn't there, because it doesn't know whether
the content hasn't
+ * arrived yet or whether the content has finished. So, one of these,
initialized with the
+ * Content-length sent in the ServletInputStream's header, will stop it
blocking, providing it's
+ * been sent with a correct content length.
+ *
+ * <p>This code is borrowed from `org.apache.commons:commons-io`
+ */
+ public class BoundedInputStream extends FilterInputStream {
+
+ /** The max count of bytes to read. */
+ private final long maxCount;
+
+ /** The count of bytes read. */
+ private long count;
+
+ /** The marked position. */
+ private long mark = -1;
+
+ /** Flag if close should be propagated. */
+ private boolean propagateClose = true;
+
+ /**
+ * Constructs a new {@link BoundedInputStream} that wraps the given input
stream and is
+ * unlimited.
+ *
+ * @param in The wrapped input stream.
+ */
+ public BoundedInputStream(final InputStream in) {
+ this(in, -1);
+ }
+
+ /**
+ * Constructs a new {@link BoundedInputStream} that wraps the given input
stream and limits it
+ * to a certain size.
+ *
+ * @param inputStream The wrapped input stream.
+ * @param maxLength The maximum number of bytes to return.
+ */
+ public BoundedInputStream(final InputStream inputStream, final long
maxLength) {
+ // Some badly designed methods - e.g. the servlet API - overload length
+ // such that "-1" means stream finished
+ super(inputStream);
+ this.maxCount = maxLength;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int available() throws IOException {
+ if (isMaxLength()) {
+ onMaxLength(maxCount, count);
+ return 0;
+ }
+ return in.available();
+ }
+
+ /**
+ * Invokes the delegate's {@code close()} method if {@link
#isPropagateClose()} is {@code true}.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void close() throws IOException {
+ if (propagateClose) {
+ in.close();
+ }
+ }
+
+ /**
+ * Gets the count of bytes read.
+ *
+ * @return The count of bytes read.
+ * @since 2.12.0
+ */
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * Gets the max count of bytes to read.
+ *
+ * @return The max count of bytes to read.
+ * @since 2.12.0
+ */
+ public long getMaxLength() {
+ return maxCount;
+ }
+
+ private boolean isMaxLength() {
+ return maxCount >= 0 && count >= maxCount;
+ }
+
+ /**
+ * Tests whether the {@link #close()} method should propagate to the
underling {@link
+ * InputStream}.
+ *
+ * @return {@code true} if calling {@link #close()} propagates to the
{@code close()} method of
+ * the underlying stream or {@code false} if it does not.
+ */
+ public boolean isPropagateClose() {
+ return propagateClose;
+ }
+
+ /**
+ * Sets whether the {@link #close()} method should propagate to the
underling {@link
+ * InputStream}.
+ *
+ * @param propagateClose {@code true} if calling {@link #close()}
propagates to the {@code
+ * close()} method of the underlying stream or {@code false} if it
does not.
+ */
+ public void setPropagateClose(final boolean propagateClose) {
+ this.propagateClose = propagateClose;
+ }
+
+ /**
+ * Invokes the delegate's {@code mark(int)} method.
+ *
+ * @param readlimit read ahead limit
+ */
+ @Override
+ public synchronized void mark(final int readlimit) {
+ in.mark(readlimit);
+ mark = count;
+ }
+
+ /**
+ * Invokes the delegate's {@code markSupported()} method.
+ *
+ * @return true if mark is supported, otherwise false
+ */
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ /**
+ * A caller has caused a request that would cross the {@code maxLength}
boundary.
+ *
+ * @param maxLength The max count of bytes to read.
+ * @param bytesRead The count of bytes read.
+ * @throws IOException Subclasses may throw.
+ * @since 2.12.0
+ */
+ protected void onMaxLength(final long maxLength, final long bytesRead)
throws IOException {
+ // for subclasses
+ }
+
+ /**
+ * Invokes the delegate's {@code read()} method if the current position is
less than the limit.
+ *
+ * @return the byte read or -1 if the end of stream or the limit has been
reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public int read() throws IOException {
+ if (isMaxLength()) {
+ onMaxLength(maxCount, count);
+ return -1;
+ }
+ final int result = in.read();
+ count++;
+ return result;
+ }
+
+ /**
+ * Invokes the delegate's {@code read(byte[])} method.
+ *
+ * @param b the buffer to read the bytes into
+ * @return the number of bytes read or -1 if the end of stream or the
limit has been reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return this.read(b, 0, b.length);
+ }
+
+ /**
+ * Invokes the delegate's {@code read(byte[], int, int)} method.
+ *
+ * @param b the buffer to read the bytes into
+ * @param off The start offset
+ * @param len The number of bytes to read
+ * @return the number of bytes read or -1 if the end of stream or the
limit has been reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws
IOException {
+ if (isMaxLength()) {
+ onMaxLength(maxCount, count);
+ return -1;
+ }
+ final long maxRead = maxCount >= 0 ? Math.min(len, maxCount - count) :
len;
+ final int bytesRead = in.read(b, off, (int) maxRead);
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ count += bytesRead;
+ return bytesRead;
+ }
+
+ /**
+ * Invokes the delegate's {@code reset()} method.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ in.reset();
+ count = mark;
+ }
+
+ /**
+ * Invokes the delegate's {@code skip(long)} method.
+ *
+ * @param n the number of bytes to skip
+ * @return the actual number of bytes skipped
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public long skip(final long n) throws IOException {
+ final long toSkip = maxCount >= 0 ? Math.min(n, maxCount - count) : n;
+ final long skippedBytes = in.skip(toSkip);
+ count += skippedBytes;
+ return skippedBytes;
+ }
+
+ /**
+ * Invokes the delegate's {@code toString()} method.
+ *
+ * @return the delegate's {@code toString()}
+ */
+ @Override
+ public String toString() {
+ return in.toString();
+ }
+ }
+}
diff --git
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java
deleted file mode 100644
index ea0ef0fe4d..0000000000
---
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.aliyun.oss.mock;
-
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.Banner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import
org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
-import org.springframework.boot.builder.SpringApplicationBuilder;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.core.convert.converter.Converter;
-import org.springframework.http.MediaType;
-import
org.springframework.http.converter.xml.MappingJackson2XmlHttpMessageConverter;
-import org.springframework.util.StringUtils;
-import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
-
-@SuppressWarnings("checkstyle:AnnotationUseStyle")
-@Configuration
-@EnableAutoConfiguration(
- exclude = {SecurityAutoConfiguration.class},
- excludeName = {
-
"org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration"
- })
-@ComponentScan
-public class AliyunOSSMockApp {
-
- static final String PROP_ROOT_DIR = "root-dir";
-
- static final String PROP_HTTP_PORT = "server.port";
- static final int PORT_HTTP_PORT_DEFAULT = 9393;
-
- static final String PROP_SILENT = "silent";
-
- @Autowired private ConfigurableApplicationContext context;
-
- public static AliyunOSSMockApp start(Map<String, Object> properties,
String... args) {
- Map<String, Object> defaults = Maps.newHashMap();
- defaults.put(PROP_HTTP_PORT, PORT_HTTP_PORT_DEFAULT);
-
- Banner.Mode bannerMode = Banner.Mode.CONSOLE;
-
- if (Boolean.parseBoolean(String.valueOf(properties.remove(PROP_SILENT)))) {
- defaults.put("logging.level.root", "WARN");
- bannerMode = Banner.Mode.OFF;
- }
-
- final ConfigurableApplicationContext ctx =
- new SpringApplicationBuilder(AliyunOSSMockApp.class)
- .properties(defaults)
- .properties(properties)
- .bannerMode(bannerMode)
- .run(args);
-
- return ctx.getBean(AliyunOSSMockApp.class);
- }
-
- public void stop() {
- SpringApplication.exit(context, () -> 0);
- }
-
- @Configuration
- static class Config implements WebMvcConfigurer {
-
- @Bean
- Converter<String, Range> rangeConverter() {
- return new RangeConverter();
- }
-
- /**
- * Creates an HttpMessageConverter for XML.
- *
- * @return The configured {@link MappingJackson2XmlHttpMessageConverter}.
- */
- @Bean
- public MappingJackson2XmlHttpMessageConverter getMessageConverter() {
- List<MediaType> mediaTypes = Lists.newArrayList();
- mediaTypes.add(MediaType.APPLICATION_XML);
- mediaTypes.add(MediaType.APPLICATION_FORM_URLENCODED);
- mediaTypes.add(MediaType.APPLICATION_OCTET_STREAM);
-
- final MappingJackson2XmlHttpMessageConverter xmlConverter =
- new MappingJackson2XmlHttpMessageConverter();
- xmlConverter.setSupportedMediaTypes(mediaTypes);
-
- return xmlConverter;
- }
- }
-
- private static class RangeConverter implements Converter<String, Range> {
-
- private static final Pattern REQUESTED_RANGE_PATTERN =
- Pattern.compile("^bytes=((\\d*)-(\\d*))((,\\d*-\\d*)*)");
-
- @Override
- public Range convert(String rangeString) {
- Preconditions.checkNotNull(rangeString, "Range value should not be
null.");
-
- final Range range;
-
- // parsing a range specification of format: "bytes=start-end" - multiple
ranges not supported
- final Matcher matcher =
REQUESTED_RANGE_PATTERN.matcher(rangeString.trim());
- if (matcher.matches()) {
- final String rangeStart = matcher.group(2);
- final String rangeEnd = matcher.group(3);
-
- long start = StringUtils.isEmpty(rangeStart) ? -1L :
Long.parseLong(rangeStart);
- long end = StringUtils.isEmpty(rangeEnd) ? Long.MAX_VALUE :
Long.parseLong(rangeEnd);
- range = new Range(start, end);
-
- if (matcher.groupCount() == 5 && !"".equals(matcher.group(4))) {
- throw new IllegalArgumentException(
- "Unsupported range specification. Only single range
specifications allowed");
- }
- if (range.start() != -1 && range.start() < 0) {
- throw new IllegalArgumentException(
- "Unsupported range specification. A start byte must be
supplied");
- }
-
- if (range.end() != -1 && range.end() < range.start()) {
- throw new IllegalArgumentException(
- "Range header is malformed. End byte is smaller than start
byte.");
- }
- } else {
- // Per Aliyun OSS behavior, return whole object content for illegal
header
- range = new Range(0, Long.MAX_VALUE);
- }
-
- return range;
- }
- }
-}
diff --git
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
index 9aae5b777a..d4cb106150 100644
---
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
+++
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
@@ -34,7 +34,7 @@ public class AliyunOSSMockExtension implements
AliyunOSSExtension {
private final Map<String, Object> properties;
- private AliyunOSSMockApp ossMockApp;
+ private AliyunOSSMock ossMock;
private AliyunOSSMockExtension(Map<String, Object> properties) {
this.properties = properties;
@@ -51,12 +51,16 @@ public class AliyunOSSMockExtension implements
AliyunOSSExtension {
@Override
public void start() {
- ossMockApp = AliyunOSSMockApp.start(properties);
+ try {
+ ossMock = AliyunOSSMock.start(properties);
+ } catch (Exception e) {
+ throw new RuntimeException("Can't start OSS Mock");
+ }
}
@Override
public void stop() {
- ossMockApp.stop();
+ ossMock.stop();
}
@Override
@@ -65,12 +69,12 @@ public class AliyunOSSMockExtension implements
AliyunOSSExtension {
String.format(
"http://localhost:%s",
properties.getOrDefault(
- AliyunOSSMockApp.PROP_HTTP_PORT,
AliyunOSSMockApp.PORT_HTTP_PORT_DEFAULT));
+ AliyunOSSMock.PROP_HTTP_PORT,
AliyunOSSMock.PORT_HTTP_PORT_DEFAULT));
return new OSSClientBuilder().build(endpoint, "foo", "bar");
}
private File rootDir() {
- Object rootDir = properties.get(AliyunOSSMockApp.PROP_ROOT_DIR);
+ Object rootDir = properties.get(AliyunOSSMock.PROP_ROOT_DIR);
Preconditions.checkNotNull(rootDir, "Root directory cannot be null");
return new File(rootDir.toString());
}
@@ -103,20 +107,15 @@ public class AliyunOSSMockExtension implements
AliyunOSSExtension {
public static class Builder {
private final Map<String, Object> props = Maps.newHashMap();
- public Builder silent() {
- props.put(AliyunOSSMockApp.PROP_SILENT, true);
- return this;
- }
-
public AliyunOSSExtension build() {
- String rootDir = (String) props.get(AliyunOSSMockApp.PROP_ROOT_DIR);
+ String rootDir = (String) props.get(AliyunOSSMock.PROP_ROOT_DIR);
if (Strings.isNullOrEmpty(rootDir)) {
File dir =
new File(
System.getProperty("java.io.tmpdir"),
"oss-mock-file-store-" + System.currentTimeMillis());
rootDir = dir.getAbsolutePath();
- props.put(AliyunOSSMockApp.PROP_ROOT_DIR, rootDir);
+ props.put(AliyunOSSMock.PROP_ROOT_DIR, rootDir);
}
File root = new File(rootDir);
root.deleteOnExit();
diff --git
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java
deleted file mode 100644
index 7f7546ec23..0000000000
---
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.aliyun.oss.mock;
-
-import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
-import static org.springframework.http.HttpStatus.OK;
-import static org.springframework.http.HttpStatus.PARTIAL_CONTENT;
-import static
org.springframework.http.HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
-
-import com.aliyun.oss.OSSErrorCode;
-import com.aliyun.oss.model.Bucket;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonRootName;
-import java.io.FileInputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.ControllerAdvice;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestHeader;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RestController;
-import
org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
-
-@RestController
-public class AliyunOSSMockLocalController {
- private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSMockLocalController.class);
-
- @Autowired private AliyunOSSMockLocalStore localStore;
-
- private static String filenameFrom(@PathVariable String bucketName,
HttpServletRequest request) {
- String requestUri = request.getRequestURI();
- return requestUri.substring(requestUri.indexOf(bucketName) +
bucketName.length() + 1);
- }
-
- @RequestMapping(value = "/{bucketName}", method = RequestMethod.PUT,
produces = "application/xml")
- public void putBucket(@PathVariable String bucketName) throws IOException {
- if (localStore.getBucket(bucketName) != null) {
- throw new OssException(
- 409, OSSErrorCode.BUCKET_ALREADY_EXISTS, bucketName + " already
exists.");
- }
-
- localStore.createBucket(bucketName);
- }
-
- @RequestMapping(
- value = "/{bucketName}",
- method = RequestMethod.DELETE,
- produces = "application/xml")
- public void deleteBucket(@PathVariable String bucketName) throws IOException
{
- verifyBucketExistence(bucketName);
-
- localStore.deleteBucket(bucketName);
- }
-
- @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.PUT)
- public ResponseEntity<String> putObject(
- @PathVariable String bucketName, HttpServletRequest request) {
- verifyBucketExistence(bucketName);
- String filename = filenameFrom(bucketName, request);
- try (ServletInputStream inputStream = request.getInputStream()) {
- ObjectMetadata metadata =
- localStore.putObject(
- bucketName,
- filename,
- inputStream,
- request.getContentType(),
- request.getHeader(HttpHeaders.CONTENT_ENCODING),
- ImmutableMap.of());
-
- HttpHeaders responseHeaders = new HttpHeaders();
- responseHeaders.setETag("\"" + metadata.getContentMD5() + "\"");
- responseHeaders.setLastModified(metadata.getLastModificationDate());
-
- return new ResponseEntity<>(responseHeaders, OK);
- } catch (Exception e) {
- LOG.error("Failed to put object - bucket: {} - object: {}", bucketName,
filename, e);
- return new ResponseEntity<>(e.getMessage(), INTERNAL_SERVER_ERROR);
- }
- }
-
- @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.DELETE)
- public void deleteObject(@PathVariable String bucketName, HttpServletRequest
request) {
- verifyBucketExistence(bucketName);
-
- localStore.deleteObject(bucketName, filenameFrom(bucketName, request));
- }
-
- @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.HEAD)
- public ResponseEntity<String> getObjectMeta(
- @PathVariable String bucketName, HttpServletRequest request) {
- verifyBucketExistence(bucketName);
- ObjectMetadata metadata = verifyObjectExistence(bucketName,
filenameFrom(bucketName, request));
-
- HttpHeaders headers = new HttpHeaders();
- headers.setETag("\"" + metadata.getContentMD5() + "\"");
- headers.setLastModified(metadata.getLastModificationDate());
- headers.setContentLength(metadata.getContentLength());
-
- return new ResponseEntity<>(headers, OK);
- }
-
- @SuppressWarnings("checkstyle:AnnotationUseStyle")
- @RequestMapping(
- value = "/{bucketName:.+}/**",
- method = RequestMethod.GET,
- produces = "application/xml")
- public void getObject(
- @PathVariable String bucketName,
- @RequestHeader(value = "Range", required = false) Range range,
- HttpServletRequest request,
- HttpServletResponse response)
- throws IOException {
- verifyBucketExistence(bucketName);
-
- String filename = filenameFrom(bucketName, request);
- ObjectMetadata metadata = verifyObjectExistence(bucketName, filename);
-
- if (range != null) {
- long fileSize = metadata.getContentLength();
- long bytesToRead = Math.min(fileSize - 1, range.end()) - range.start() +
1;
- long skipSize = range.start();
- if (range.start() == -1) {
- bytesToRead = Math.min(fileSize - 1, range.end());
- skipSize = fileSize - range.end();
- }
- if (range.end() == -1) {
- bytesToRead = fileSize - range.start();
- }
- if (bytesToRead < 0 || fileSize < range.start()) {
- response.setStatus(REQUESTED_RANGE_NOT_SATISFIABLE.value());
- response.flushBuffer();
- return;
- }
-
- response.setStatus(PARTIAL_CONTENT.value());
- response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
- response.setHeader(
- HttpHeaders.CONTENT_RANGE,
- String.format(
- "bytes %s-%s/%s",
- range.start(), bytesToRead + range.start() + 1,
metadata.getContentLength()));
- response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() +
"\"");
- response.setDateHeader(HttpHeaders.LAST_MODIFIED,
metadata.getLastModificationDate());
- response.setContentType(metadata.getContentType());
- response.setContentLengthLong(bytesToRead);
-
- try (OutputStream outputStream = response.getOutputStream()) {
- try (FileInputStream fis = new
FileInputStream(metadata.getDataFile())) {
- fis.skip(skipSize);
- ByteStreams.copy(new BoundedInputStream(fis, bytesToRead),
outputStream);
- }
- }
- } else {
- response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
- response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() +
"\"");
- response.setDateHeader(HttpHeaders.LAST_MODIFIED,
metadata.getLastModificationDate());
- response.setContentType(metadata.getContentType());
- response.setContentLengthLong(metadata.getContentLength());
-
- try (OutputStream outputStream = response.getOutputStream()) {
- try (FileInputStream fis = new
FileInputStream(metadata.getDataFile())) {
- ByteStreams.copy(fis, outputStream);
- }
- }
- }
- }
-
- private void verifyBucketExistence(String bucketName) {
- Bucket bucket = localStore.getBucket(bucketName);
- if (bucket == null) {
- throw new OssException(
- 404, OSSErrorCode.NO_SUCH_BUCKET, "The specified bucket does not
exist. ");
- }
- }
-
- private ObjectMetadata verifyObjectExistence(String bucketName, String
filename) {
- ObjectMetadata objectMetadata = null;
- try {
- objectMetadata = localStore.getObjectMetadata(bucketName, filename);
- } catch (IOException e) {
- LOG.error(
- "Failed to get the object metadata, bucket: {}, object: {}.",
bucketName, filename, e);
- }
-
- if (objectMetadata == null) {
- throw new OssException(404, OSSErrorCode.NO_SUCH_KEY, "The specify oss
key does not exists.");
- }
-
- return objectMetadata;
- }
-
- @ControllerAdvice
- public static class OSSMockExceptionHandler extends
ResponseEntityExceptionHandler {
-
- @ExceptionHandler
- public ResponseEntity<ErrorResponse> handleOSSException(OssException ex) {
- LOG.info("Responding with status {} - {}, {}", ex.status, ex.code,
ex.message);
-
- ErrorResponse errorResponse = new ErrorResponse();
- errorResponse.setCode(ex.getCode());
- errorResponse.setMessage(ex.getMessage());
-
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_XML);
-
- return
ResponseEntity.status(ex.status).headers(headers).body(errorResponse);
- }
- }
-
- public static class OssException extends RuntimeException {
-
- private final int status;
- private final String code;
- private final String message;
-
- public OssException(final int status, final String code, final String
message) {
- super(message);
- this.status = status;
- this.code = code;
- this.message = message;
- }
-
- public String getCode() {
- return code;
- }
-
- @Override
- public String getMessage() {
- return message;
- }
- }
-
- @JsonRootName("Error")
- public static class ErrorResponse {
- @JsonProperty("Code")
- private String code;
-
- @JsonProperty("Message")
- private String message;
-
- public void setCode(String code) {
- this.code = code;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
- }
-
- /**
- * Reads bytes up to a maximum length, if its count goes above that, it
stops.
- *
- * <p>This is useful to wrap ServletInputStreams. The ServletInputStream
will block if you try to
- * read content from it that isn't there, because it doesn't know whether
the content hasn't
- * arrived yet or whether the content has finished. So, one of these,
initialized with the
- * Content-length sent in the ServletInputStream's header, will stop it
blocking, providing it's
- * been sent with a correct content length.
- *
- * <p>This code is borrowed from `org.apache.commons:commons-io`
- */
- public class BoundedInputStream extends FilterInputStream {
-
- /** The max count of bytes to read. */
- private final long maxCount;
-
- /** The count of bytes read. */
- private long count;
-
- /** The marked position. */
- private long mark = -1;
-
- /** Flag if close should be propagated. */
- private boolean propagateClose = true;
-
- /**
- * Constructs a new {@link BoundedInputStream} that wraps the given input
stream and is
- * unlimited.
- *
- * @param in The wrapped input stream.
- */
- public BoundedInputStream(final InputStream in) {
- this(in, -1);
- }
-
- /**
- * Constructs a new {@link BoundedInputStream} that wraps the given input
stream and limits it
- * to a certain size.
- *
- * @param inputStream The wrapped input stream.
- * @param maxLength The maximum number of bytes to return.
- */
- public BoundedInputStream(final InputStream inputStream, final long
maxLength) {
- // Some badly designed methods - e.g. the servlet API - overload length
- // such that "-1" means stream finished
- super(inputStream);
- this.maxCount = maxLength;
- }
-
- /** {@inheritDoc} */
- @Override
- public int available() throws IOException {
- if (isMaxLength()) {
- onMaxLength(maxCount, count);
- return 0;
- }
- return in.available();
- }
-
- /**
- * Invokes the delegate's {@code close()} method if {@link
#isPropagateClose()} is {@code true}.
- *
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public void close() throws IOException {
- if (propagateClose) {
- in.close();
- }
- }
-
- /**
- * Gets the count of bytes read.
- *
- * @return The count of bytes read.
- * @since 2.12.0
- */
- public long getCount() {
- return count;
- }
-
- /**
- * Gets the max count of bytes to read.
- *
- * @return The max count of bytes to read.
- * @since 2.12.0
- */
- public long getMaxLength() {
- return maxCount;
- }
-
- private boolean isMaxLength() {
- return maxCount >= 0 && count >= maxCount;
- }
-
- /**
- * Tests whether the {@link #close()} method should propagate to the
underling {@link
- * InputStream}.
- *
- * @return {@code true} if calling {@link #close()} propagates to the
{@code close()} method of
- * the underlying stream or {@code false} if it does not.
- */
- public boolean isPropagateClose() {
- return propagateClose;
- }
-
- /**
- * Sets whether the {@link #close()} method should propagate to the
underling {@link
- * InputStream}.
- *
- * @param propagateClose {@code true} if calling {@link #close()}
propagates to the {@code
- * close()} method of the underlying stream or {@code false} if it
does not.
- */
- public void setPropagateClose(final boolean propagateClose) {
- this.propagateClose = propagateClose;
- }
-
- /**
- * Invokes the delegate's {@code mark(int)} method.
- *
- * @param readlimit read ahead limit
- */
- @Override
- public synchronized void mark(final int readlimit) {
- in.mark(readlimit);
- mark = count;
- }
-
- /**
- * Invokes the delegate's {@code markSupported()} method.
- *
- * @return true if mark is supported, otherwise false
- */
- @Override
- public boolean markSupported() {
- return in.markSupported();
- }
-
- /**
- * A caller has caused a request that would cross the {@code maxLength}
boundary.
- *
- * @param maxLength The max count of bytes to read.
- * @param bytesRead The count of bytes read.
- * @throws IOException Subclasses may throw.
- * @since 2.12.0
- */
- protected void onMaxLength(final long maxLength, final long bytesRead)
throws IOException {
- // for subclasses
- }
-
- /**
- * Invokes the delegate's {@code read()} method if the current position is
less than the limit.
- *
- * @return the byte read or -1 if the end of stream or the limit has been
reached.
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public int read() throws IOException {
- if (isMaxLength()) {
- onMaxLength(maxCount, count);
- return -1;
- }
- final int result = in.read();
- count++;
- return result;
- }
-
- /**
- * Invokes the delegate's {@code read(byte[])} method.
- *
- * @param b the buffer to read the bytes into
- * @return the number of bytes read or -1 if the end of stream or the
limit has been reached.
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public int read(final byte[] b) throws IOException {
- return this.read(b, 0, b.length);
- }
-
- /**
- * Invokes the delegate's {@code read(byte[], int, int)} method.
- *
- * @param b the buffer to read the bytes into
- * @param off The start offset
- * @param len The number of bytes to read
- * @return the number of bytes read or -1 if the end of stream or the
limit has been reached.
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public int read(final byte[] b, final int off, final int len) throws
IOException {
- if (isMaxLength()) {
- onMaxLength(maxCount, count);
- return -1;
- }
- final long maxRead = maxCount >= 0 ? Math.min(len, maxCount - count) :
len;
- final int bytesRead = in.read(b, off, (int) maxRead);
-
- if (bytesRead == -1) {
- return -1;
- }
-
- count += bytesRead;
- return bytesRead;
- }
-
- /**
- * Invokes the delegate's {@code reset()} method.
- *
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public synchronized void reset() throws IOException {
- in.reset();
- count = mark;
- }
-
- /**
- * Invokes the delegate's {@code skip(long)} method.
- *
- * @param n the number of bytes to skip
- * @return the actual number of bytes skipped
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public long skip(final long n) throws IOException {
- final long toSkip = maxCount >= 0 ? Math.min(n, maxCount - count) : n;
- final long skippedBytes = in.skip(toSkip);
- count += skippedBytes;
- return skippedBytes;
- }
-
- /**
- * Invokes the delegate's {@code toString()} method.
- *
- * @return the delegate's {@code toString()}
- */
- @Override
- public String toString() {
- return in.toString();
- }
- }
-}
diff --git
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
index f7a4b72e4b..521b87e31e 100644
---
a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
+++
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
@@ -46,11 +46,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.MediaType;
-import org.springframework.stereotype.Component;
-@Component
public class AliyunOSSMockLocalStore {
private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSMockLocalStore.class);
@@ -61,8 +57,7 @@ public class AliyunOSSMockLocalStore {
private final ObjectMapper objectMapper = new ObjectMapper();
- public AliyunOSSMockLocalStore(
- @Value("${" + AliyunOSSMockApp.PROP_ROOT_DIR + ":}") String rootDir) {
+ public AliyunOSSMockLocalStore(String rootDir) {
Preconditions.checkNotNull(rootDir, "Root directory cannot be null");
this.root = new File(rootDir);
@@ -121,8 +116,7 @@ public class AliyunOSSMockLocalStore {
File dir = new File(root, bucket.getName());
if (Files.walk(dir.toPath()).anyMatch(p -> p.toFile().isFile())) {
- throw new AliyunOSSMockLocalController.OssException(
- 409, OSSErrorCode.BUCKET_NOT_EMPTY, "The bucket you tried to delete
is not empty. ");
+ throw new RuntimeException(OSSErrorCode.BUCKET_NOT_EMPTY);
}
try (Stream<Path> walk = Files.walk(dir.toPath())) {
@@ -156,7 +150,9 @@ public class AliyunOSSMockLocalStore {
metadata.setContentLength(dataFile.length());
metadata.setContentMD5(md5sum(dataFile.getAbsolutePath()));
metadata.setContentType(
- contentType != null ? contentType :
MediaType.APPLICATION_OCTET_STREAM_VALUE);
+ contentType != null
+ ? contentType
+ : "application/octet"); // MediaType.APPLICATION_OCTET_STREAM_VALUE
metadata.setContentEncoding(contentEncoding);
metadata.setDataFile(dataFile.getAbsolutePath());
metadata.setMetaFile(metaFile.getAbsolutePath());
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java
b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java
deleted file mode 100644
index ff66e5c2a1..0000000000
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.aliyun.oss.mock;
-
-public class Range {
-
- private final long start;
- private final long end;
-
- public Range(long start, long end) {
- this.start = start;
- this.end = end;
- }
-
- public long start() {
- return start;
- }
-
- public long end() {
- return end;
- }
-
- @Override
- public String toString() {
- return String.format("%d-%d", start, end);
- }
-}
diff --git a/build.gradle b/build.gradle
index a654e1cba5..abab68ca4b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -438,16 +438,6 @@ project(':iceberg-aliyun') {
testImplementation platform(libs.jackson.bom)
testImplementation
"com.fasterxml.jackson.dataformat:jackson-dataformat-xml"
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
- testImplementation libs.spring.web
- testImplementation(libs.spring.boot.starter.jetty) {
- exclude module: 'logback-classic'
- exclude group: 'org.eclipse.jetty.websocket', module:
'javax-websocket-server-impl'
- exclude group: 'org.eclipse.jetty.websocket', module: 'websocket-server'
- }
- testImplementation(libs.spring.boot.starter.web) {
- exclude module: 'logback-classic'
- exclude module: 'spring-boot-starter-logging'
- }
}
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 55aa97c350..c84341dea9 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -84,8 +84,6 @@ snowflake-jdbc = "3.19.1"
spark-hive33 = "3.3.4"
spark-hive34 = "3.4.4"
spark-hive35 = "3.5.2"
-spring-boot = "2.7.18"
-spring-web = "5.3.39"
sqlite-jdbc = "3.47.0.0"
testcontainers = "1.20.3"
tez010 = "0.10.4"
@@ -219,9 +217,6 @@ nessie-jaxrs-testextension = { module =
"org.projectnessie.nessie:nessie-jaxrs-t
nessie-versioned-storage-inmemory-tests = { module =
"org.projectnessie.nessie:nessie-versioned-storage-inmemory-tests", version.ref
= "nessie" }
nessie-versioned-storage-testextension = { module =
"org.projectnessie.nessie:nessie-versioned-storage-testextension", version.ref
= "nessie" }
orc-tools = { module = "org.apache.orc:orc-tools", version.ref = "orc" }
-spring-boot-starter-jetty = { module =
"org.springframework.boot:spring-boot-starter-jetty", version.ref =
"spring-boot" }
-spring-boot-starter-web = { module =
"org.springframework.boot:spring-boot-starter-web", version.ref = "spring-boot"
}
-spring-web = { module = "org.springframework:spring-web", version.ref =
"spring-web" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc"
}
testcontainers = { module = "org.testcontainers:testcontainers", version.ref =
"testcontainers" }
testcontainers-junit-jupiter = { module = "org.testcontainers:junit-jupiter",
version.ref = "testcontainers" }