This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new ae6971abe4 [ASTERIXDB-3288][RT] Introduce COPY TO runtime
ae6971abe4 is described below
commit ae6971abe47a832dfcff30caa82af9ee75189ede
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue Oct 24 17:08:35 2023 -0700
[ASTERIXDB-3288][RT] Introduce COPY TO runtime
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Introduce the necessary writers for COPY TO
Change-Id: Ie107e707189bdb7fc17b09439a08d47192cfa61f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17878
Integration-Tests: Jenkins <[email protected]>
Tested-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
asterixdb/asterix-cloud/pom.xml | 5 +
.../asterix/cloud/AbstractCloudIOManager.java | 2 +-
.../org/apache/asterix/cloud/CloudFileHandle.java | 2 +-
...eBufferProvider.java => CloudOutputStream.java} | 36 ++---
.../asterix/cloud/CloudResettableInputStream.java | 34 +++--
...fferProvider.java => IWriteBufferProvider.java} | 24 +---
.../apache/asterix/cloud/WriteBufferProvider.java | 7 +-
...ovider.java => WriterSingleBufferProvider.java} | 29 ++--
.../cloud/clients/aws/s3/S3ClientConfig.java | 22 ++-
.../cloud/clients/aws/s3/S3CloudClient.java | 17 ++-
.../s3/{S3Utils.java => S3CloudClientUtils.java} | 6 +-
.../cloud/writer/CloudExternalFileWriter.java | 75 +++++++++++
.../cloud/writer/S3ExternalFileWriterFactory.java | 150 +++++++++++++++++++++
.../external/writer/LocalFSExternalFileWriter.java | 73 ++++++++++
.../writer/LocalFSExternalFileWriterFactory.java | 50 +++++++
.../GzipExternalFileCompressStreamFactory.java | 42 ++++++
.../IExternalFileCompressStreamFactory.java} | 38 +++---
.../NoOpExternalFileCompressStreamFactory.java} | 29 ++--
.../writer/printer/TextualExternalFilePrinter.java | 66 +++++++++
.../printer/TextualExternalFilePrinterFactory.java | 41 ++++++
.../evaluators/functions/PointableHelper.java | 48 ++++---
.../runtime/writer/AbstractPathResolver.java | 83 ++++++++++++
.../runtime/writer/DynamicPathResolver.java | 63 +++++++++
.../asterix/runtime/writer/ExternalWriter.java | 68 ++++++++++
.../runtime/writer/ExternalWriterFactory.java | 70 ++++++++++
.../IExternalFileFilterWriterFactoryProvider.java} | 29 +---
.../runtime/writer/IExternalFilePrinter.java} | 49 ++++---
.../writer/IExternalFilePrinterFactory.java} | 34 ++---
.../runtime/writer/IExternalFileWriter.java | 57 ++++++++
.../runtime/writer/IExternalFileWriterFactory.java | 51 +++++++
.../asterix/runtime/writer/IPathResolver.java} | 40 +++---
.../runtime/writer/StaticPathResolver.java} | 31 ++---
.../writer/SinkExternalWriterRuntime.java | 114 ++++++++++++++++
.../writer/SinkExternalWriterRuntimeFactory.java | 60 +++++++++
.../runtime/writers/IExternalWriter.java | 57 ++++++++
.../runtime/writers/IExternalWriterFactory.java | 39 +++---
.../apache/hyracks/util/string/UTF8CharBuffer.java | 35 +++--
.../apache/hyracks/util/string/UTF8StringUtil.java | 25 +++-
38 files changed, 1403 insertions(+), 298 deletions(-)
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 9e840c2de9..79ee4c1e4c 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -70,6 +70,11 @@
<artifactId>asterix-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- aws s3 start -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 8c7cd8a4f1..dc8bc6862a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -59,7 +59,7 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
//TODO(DB): change
private final String metadataNamespacePath;
protected final ICloudClient cloudClient;
- protected final WriteBufferProvider writeBufferProvider;
+ protected final IWriteBufferProvider writeBufferProvider;
protected final String bucket;
protected final Set<Integer> partitions;
protected final List<FileReference> partitionPaths;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 8572014a57..14c44ad5ad 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -30,7 +30,7 @@ public class CloudFileHandle extends FileHandle {
private final CloudResettableInputStream inputStream;
public CloudFileHandle(ICloudClient cloudClient, String bucket,
FileReference fileRef,
- WriteBufferProvider bufferProvider) {
+ IWriteBufferProvider bufferProvider) {
super(fileRef);
ICloudBufferedWriter bufferedWriter =
cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
inputStream = new CloudResettableInputStream(bufferedWriter,
bufferProvider);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
similarity index 54%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
index 5e49be3674..18a97ad985 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -18,28 +18,32 @@
*/
package org.apache.asterix.cloud;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.io.IOException;
+import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+public class CloudOutputStream extends OutputStream {
+ private final CloudResettableInputStream inputStream;
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
+ public CloudOutputStream(CloudResettableInputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ inputStream.write(b, off, len);
+ }
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
+ @Override
+ public void write(int b) throws IOException {
+ inputStream.write(b);
}
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
+ @Override
+ public void close() throws IOException {
+ inputStream.finish();
}
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
+ public void abort() throws IOException {
+ inputStream.abort();
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 7ba95c5087..f19800155e 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -24,16 +24,19 @@ import java.nio.ByteBuffer;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class CloudResettableInputStream extends InputStream {
+ private static final Logger LOGGER = LogManager.getLogger();
// TODO: make configurable
public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
- private final WriteBufferProvider bufferProvider;
+ private final IWriteBufferProvider bufferProvider;
private ByteBuffer writeBuffer;
private final ICloudBufferedWriter bufferedWriter;
- public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter,
WriteBufferProvider bufferProvider) {
+ public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter,
IWriteBufferProvider bufferProvider) {
this.bufferedWriter = bufferedWriter;
this.bufferProvider = bufferProvider;
}
@@ -61,16 +64,24 @@ public class CloudResettableInputStream extends InputStream
{
}
public void write(ByteBuffer header, ByteBuffer page) throws
HyracksDataException {
- open();
write(header);
write(page);
}
public int write(ByteBuffer page) throws HyracksDataException {
open();
+ return write(page.array(), 0, page.limit());
+ }
+
+ public void write(int b) throws HyracksDataException {
+ if (writeBuffer.remaining() == 0) {
+ uploadAndWait();
+ }
+ writeBuffer.put((byte) b);
+ }
- // amount to write
- int size = page.limit();
+ public int write(byte[] b, int off, int len) throws HyracksDataException {
+ open();
// full buffer = upload -> write all
if (writeBuffer.remaining() == 0) {
@@ -78,23 +89,23 @@ public class CloudResettableInputStream extends InputStream
{
}
// write partial -> upload -> write -> upload -> ...
- int offset = 0;
- int pageRemaining = size;
+ int offset = off;
+ int pageRemaining = len;
while (pageRemaining > 0) {
// enough to write all
if (writeBuffer.remaining() > pageRemaining) {
- writeBuffer.put(page.array(), offset, pageRemaining);
- return size;
+ writeBuffer.put(b, offset, pageRemaining);
+ return len;
}
int remaining = writeBuffer.remaining();
- writeBuffer.put(page.array(), offset, remaining);
+ writeBuffer.put(b, offset, remaining);
pageRemaining -= remaining;
offset += remaining;
uploadAndWait();
}
- return size;
+ return len;
}
public void finish() throws HyracksDataException {
@@ -128,6 +139,7 @@ public class CloudResettableInputStream extends InputStream
{
try {
bufferedWriter.upload(this, writeBuffer.limit());
} catch (Exception e) {
+ LOGGER.fatal(e);
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
similarity index 55%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
index 5e49be3674..693b73adb2 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
@@ -18,28 +18,10 @@
*/
package org.apache.asterix.cloud;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
-
import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
+public interface IWriteBufferProvider {
+ ByteBuffer getBuffer();
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+ void recycle(ByteBuffer buffer);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
index 5e49be3674..ee174000e6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
@@ -24,17 +24,22 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-public class WriteBufferProvider {
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class WriteBufferProvider implements IWriteBufferProvider {
private final BlockingQueue<ByteBuffer> writeBuffers;
public WriteBufferProvider(int ioParallelism) {
writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
}
+ @Override
public void recycle(ByteBuffer buffer) {
writeBuffers.offer(buffer);
}
+ @Override
public ByteBuffer getBuffer() {
ByteBuffer writeBuffer = writeBuffers.poll();
if (writeBuffer == null) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
similarity index 67%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
index 5e49be3674..287900d907 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
@@ -21,25 +21,26 @@ package org.apache.asterix.cloud;
import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
+@NotThreadSafe
+public class WriterSingleBufferProvider implements IWriteBufferProvider {
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
+ private final ByteBuffer buffer;
+
+ public WriterSingleBufferProvider() {
+ buffer = ByteBuffer.allocate(MIN_BUFFER_SIZE);
}
+ @Override
public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
+ buffer.clear();
+ return buffer;
+ }
+
+ @Override
+ public void recycle(ByteBuffer buffer) {
+ // NoOp
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 56ed3cfb4b..a2aa21c7cb 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,13 +18,18 @@
*/
package org.apache.asterix.cloud.clients.aws.s3;
+import java.io.Serializable;
+import java.util.Map;
+
import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-public class S3ClientConfig {
+public final class S3ClientConfig implements Serializable {
+ private static final long serialVersionUID = 548292720313565948L;
// The maximum number of file that can be deleted (AWS restriction)
static final int DELETE_BATCH_SIZE = 1000;
private final String region;
@@ -48,6 +53,21 @@ public class S3ClientConfig {
cloudProperties.getProfilerLogInterval());
}
+ public static S3ClientConfig of(Map<String, String> configuration) {
+ // Used to determine local vs. actual S3
+ String endPoint =
configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME);
+ // Disabled
+ long profilerLogInterval = 0;
+
+ // Dummy values;
+ String region = "";
+ String prefix = null;
+ boolean anonymousAuth = false;
+
+ return new S3ClientConfig(region,
configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME), "",
+ anonymousAuth, profilerLogInterval);
+ }
+
public String getRegion() {
return region;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 02e1d4f0ec..b47a6ffe57 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -19,8 +19,8 @@
package org.apache.asterix.cloud.clients.aws.s3;
import static
org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
-import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.encodeURI;
-import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.listS3Objects;
+import static
org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.encodeURI;
+import static
org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.listS3Objects;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -45,6 +45,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -67,21 +68,25 @@ import
software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
+@ThreadSafe
public class S3CloudClient implements ICloudClient {
private final S3ClientConfig config;
private final S3Client s3Client;
private final IRequestProfiler profiler;
public S3CloudClient(S3ClientConfig config) {
+ this(config, buildClient(config));
+ }
+
+ public S3CloudClient(S3ClientConfig config, S3Client s3Client) {
this.config = config;
- s3Client = buildClient();
+ this.s3Client = s3Client;
long profilerInterval = config.getProfilerLogInterval();
if (profilerInterval > 0) {
profiler = new CountRequestProfiler(profilerInterval);
} else {
profiler = NoOpRequestProfiler.INSTANCE;
}
-
}
@Override
@@ -245,7 +250,7 @@ public class S3CloudClient implements ICloudClient {
s3Client.close();
}
- private S3Client buildClient() {
+ private static S3Client buildClient(S3ClientConfig config) {
S3ClientBuilder builder = S3Client.builder();
builder.credentialsProvider(config.createCredentialsProvider());
builder.region(Region.of(config.getRegion()));
@@ -264,7 +269,7 @@ public class S3CloudClient implements ICloudClient {
private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter
filter) {
Set<String> files = new HashSet<>();
for (S3Object s3Object : contents) {
- String path = config.isLocalS3Provider() ?
S3Utils.decodeURI(s3Object.key()) : s3Object.key();
+ String path = config.isLocalS3Provider() ?
S3CloudClientUtils.decodeURI(s3Object.key()) : s3Object.key();
if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
files.add(path);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
similarity index 96%
rename from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
rename to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
index 8901dec73a..82b0ae1fd0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
@@ -31,9 +31,9 @@ import
software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
-public class S3Utils {
+public class S3CloudClientUtils {
- private S3Utils() {
+ private S3CloudClientUtils() {
throw new AssertionError("do not instantiate");
}
@@ -78,7 +78,7 @@ public class S3Utils {
return URLDecoder.decode(path, Charset.defaultCharset());
}
- public static String toCloudPrefix(String path) {
+ private static String toCloudPrefix(String path) {
return path.startsWith(File.separator) ? path.substring(1) : path;
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
new file mode 100644
index 0000000000..22095ca835
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.CloudOutputStream;
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class CloudExternalFileWriter implements IExternalFileWriter {
+ private final IExternalFilePrinter printer;
+ private final ICloudClient cloudClient;
+ private final String bucket;
+ private final IWriteBufferProvider bufferProvider;
+ private ICloudBufferedWriter bufferedWriter;
+
+ public CloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient
cloudClient, String bucket) {
+ this.printer = printer;
+ this.cloudClient = cloudClient;
+ this.bucket = bucket;
+ bufferProvider = new WriterSingleBufferProvider();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void newFile(String path) throws HyracksDataException {
+ bufferedWriter = cloudClient.createBufferedWriter(bucket, path);
+ CloudResettableInputStream inputStream = new
CloudResettableInputStream(bufferedWriter, bufferProvider);
+
+ CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+ printer.newStream(outputStream);
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ printer.print(value);
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ bufferedWriter.abort();
+ printer.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ printer.close();
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
new file mode 100644
index 0000000000..204d5da199
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
+import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import
org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
+
+public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFactory {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 4551318140901866805L;
+ public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
S3ExternalFileWriterFactory::new;
+ private final S3ClientConfig config;
+ private final Map<String, String> configuration;
+ private transient S3CloudClient cloudClient;
+
+ private S3ExternalFileWriterFactory(Map<String, String> configuration) {
+ this.config = S3ClientConfig.of(configuration);
+ this.configuration = configuration;
+ cloudClient = null;
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalFilePrinterFactory printerFactory)
+ throws HyracksDataException {
+ buildClient();
+ String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ return new CloudExternalFileWriter(printerFactory.createPrinter(),
cloudClient, bucket);
+ }
+
+ private void buildClient() throws HyracksDataException {
+ try {
+ synchronized (this) {
+ if (cloudClient == null) {
+ // only a single client should be build
+ cloudClient = new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
+ }
+ }
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public char getFileSeparator() {
+ return '/';
+ }
+
+ @Override
+ public void validate() throws AlgebricksException {
+ ICloudClient testClient = new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
+ String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ if (bucket == null || bucket.isEmpty()) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+ ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ }
+ try {
+ doValidate(testClient, bucket);
+ } catch (IOException e) {
+ if (e.getCause() instanceof NoSuchBucketException) {
+ throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
+ } else {
+ LOGGER.fatal(e);
+ throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ ExceptionUtils.getMessageOrToString(e));
+ }
+ }
+ }
+
+ private static void doValidate(ICloudClient testClient, String bucket)
throws IOException {
+ Random random = new Random();
+ String pathPrefix = "testFile";
+ String path = pathPrefix + random.nextInt();
+ while (testClient.exists(bucket, path)) {
+ path = pathPrefix + random.nextInt();
+ }
+
+ long writeValue = random.nextLong();
+ byte[] data = new byte[Long.BYTES];
+ LongPointable.setLong(data, 0, writeValue);
+ ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket,
path);
+ CloudResettableInputStream stream = null;
+ boolean aborted = false;
+ try {
+ stream = new CloudResettableInputStream(writer, new
WriterSingleBufferProvider());
+ stream.write(data, 0, data.length);
+ } catch (HyracksDataException e) {
+ stream.abort();
+ } finally {
+ if (stream != null && !aborted) {
+ stream.finish();
+ stream.close();
+ }
+ }
+
+ try {
+ long readValue =
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+ if (writeValue != readValue) {
+ // This should never happen unless S3 is messed up. But log
for sanity check
+ LOGGER.warn(
+ "The writer can write but the written values wasn't
successfully read back (wrote: {}, read:{})",
+ writeValue, readValue);
+ }
+ } finally {
+ // Delete the written file
+ testClient.deleteObjects(bucket, Collections.singleton(path));
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
new file mode 100644
index 0000000000..e8983d8741
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class LocalFSExternalFileWriter implements IExternalFileWriter {
+ private final IExternalFilePrinter printer;
+
+ LocalFSExternalFileWriter(IExternalFilePrinter printer) {
+ this.printer = printer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void newFile(String path) throws HyracksDataException {
+ try {
+ File currentFile = new File(path);
+ if (currentFile.exists()) {
+ currentFile.delete();
+ }
+ FileUtils.createParentDirectories(currentFile);
+ currentFile.createNewFile();
+ printer.newStream(new BufferedOutputStream(new
FileOutputStream(currentFile)));
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ printer.print(value);
+ }
+
+ @Override
+ public void abort() {
+ printer.close();
+ }
+
+ @Override
+ public void close() {
+ printer.close();
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
new file mode 100644
index 0000000000..d206b2aff0
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.File;
+
+import
org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public final class LocalFSExternalFileWriterFactory implements
IExternalFileWriterFactory {
+ private static final long serialVersionUID = 871685327574547749L;
+ public static final IExternalFileFilterWriterFactoryProvider PROVIDER = c
-> new LocalFSExternalFileWriterFactory();
+
+ private LocalFSExternalFileWriterFactory() {
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalFilePrinterFactory printerFactory) {
+ return new LocalFSExternalFileWriter(printerFactory.createPrinter());
+ }
+
+ @Override
+ public char getFileSeparator() {
+ return File.separatorChar;
+ }
+
+ @Override
+ public void validate() {
+ // NoOp
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000000..5ef196d190
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GzipExternalFileCompressStreamFactory implements
IExternalFileCompressStreamFactory {
+ private static final long serialVersionUID = -7364595253362922025L;
+ public static IExternalFileCompressStreamFactory INSTANCE = new
GzipExternalFileCompressStreamFactory();
+
+ private GzipExternalFileCompressStreamFactory() {
+ }
+
+ @Override
+ public OutputStream createStream(OutputStream outputStream) throws
HyracksDataException {
+ try {
+ return new GzipCompressorOutputStream(outputStream);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
index 5e49be3674..e46e5c08f8 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
@@ -16,30 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.external.writer.compressor;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.io.OutputStream;
+import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
-
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
+/**
+ * Creates a compression {@link OutputStream}
+ */
+@FunctionalInterface
+public interface IExternalFileCompressStreamFactory extends Serializable {
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+ /**
+ * Create a compressed stream before writing to the provided stream
+ *
+ * @param outputStream destination output stream
+ * @return compressing stream
+ */
+ OutputStream createStream(OutputStream outputStream) throws
HyracksDataException;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
similarity index 52%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
index 5e49be3674..8876e0f573 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
@@ -16,30 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.external.writer.compressor;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+public class NoOpExternalFileCompressStreamFactory implements
IExternalFileCompressStreamFactory {
+ private static final long serialVersionUID = -2211142209501287615L;
+ public static final IExternalFileCompressStreamFactory INSTANCE = new
NoOpExternalFileCompressStreamFactory();
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
-
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
+ private NoOpExternalFileCompressStreamFactory() {
}
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
+ @Override
+ public OutputStream createStream(OutputStream outputStream) {
+ return outputStream;
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
new file mode 100644
index 0000000000..bff0db87b6
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class TextualExternalFilePrinter implements IExternalFilePrinter {
+ private final IPrinter printer;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private PrintStream printStream;
+
+ TextualExternalFilePrinter(IPrinter printer,
IExternalFileCompressStreamFactory compressStreamFactory) {
+ this.printer = printer;
+ this.compressStreamFactory = compressStreamFactory;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.init();
+ }
+
+ @Override
+ public void newStream(OutputStream outputStream) throws
HyracksDataException {
+ if (printStream != null) {
+ printStream.close();
+ }
+ printStream = new
PrintStream(compressStreamFactory.createStream(outputStream));
+ }
+
+ @Override
+ public void print(IValueReference value) throws HyracksDataException {
+ printer.print(value.getByteArray(), value.getStartOffset(),
value.getLength(), printStream);
+ printStream.println();
+ }
+
+ @Override
+ public void close() {
+ printStream.close();
+ if (printStream.checkError()) {
+ throw new IllegalStateException("Print error. Check the logs for
more information");
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
new file mode 100644
index 0000000000..6778532526
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFilePrinterFactory implements
IExternalFilePrinterFactory {
+ private static final long serialVersionUID = 9155959967258587588L;
+ private final IPrinterFactory printerFactory;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+
+ public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
+ IExternalFileCompressStreamFactory compressStreamFactory) {
+ this.printerFactory = printerFactory;
+ this.compressStreamFactory = compressStreamFactory;
+ }
+
+ @Override
+ public IExternalFilePrinter createPrinter() {
+ return new TextualExternalFilePrinter(printerFactory.createPrinter(),
compressStreamFactory);
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
index 7a017bdf85..deb0da9ee2 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
@@ -62,6 +62,7 @@ public class PointableHelper {
private final UTF8StringWriter utf8Writer;
public static final IPointable NULL_REF = new VoidPointable();
+
static {
NULL_REF.set(NULL_BYTES, 0, NULL_BYTES.length);
}
@@ -139,12 +140,9 @@ public class PointableHelper {
}
/**
- * @param str
- * The input string
- * @param vs
- * The storage buffer
- * @param writeTag
- * Specifying whether a tag for the string should also be
written
+ * @param str The input string
+ * @param vs The storage buffer
+ * @param writeTag Specifying whether a tag for the string should also be
written
*/
public void serializeString(String str, IMutableValueStorage vs, boolean
writeTag) throws HyracksDataException {
vs.reset();
@@ -219,22 +217,21 @@ public class PointableHelper {
* This method takes multiple pointables, the first pointable being the
pointable to write the result to, and
* checks their ATypeTag value. If a missing or null ATypeTag is
encountered, the method will set the result
* pointable to missing or null accordingly, and will return {@code true}.
- *
+ * <p>
* As the missing encounter has a higher priority than the null, the
method will keep checking if any missing has
* been encountered first, if not, it will do a null check at the end.
- *
+ * <p>
* If the listAccessor is passed, this method will also go through any
list pointable elements and search for
* a missing value to give it a higher priority over null values. If
{@code null} is passed for the listAccessor,
* the list element check will be skipped.
*
- * @param result the result pointable that will hold the data
+ * @param result the result pointable that will hold the data
* @param listAccessor list accessor to use for check list elements.
- * @param pointable1 the first pointable to be checked
- * @param pointable2 the second pointable to be checked
- * @param pointable3 the third pointable to be checked
- * @param pointable4 the fourth pointable to be checked
- * @param pointable5 the fourth pointable to be checked
- *
+ * @param pointable1 the first pointable to be checked
+ * @param pointable2 the second pointable to be checked
+ * @param pointable3 the third pointable to be checked
+ * @param pointable4 the fourth pointable to be checked
+ * @param pointable5 the fourth pointable to be checked
* @return {@code true} if the pointable value is missing or null, {@code
false} otherwise.
*/
public static boolean checkAndSetMissingOrNull(IPointable result,
ListAccessor listAccessor, IPointable pointable1,
@@ -311,9 +308,8 @@ public class PointableHelper {
* Checks whether the pointable {@param pointable1} is null or missing,
and if true, assigns null to the
* {@param result}.
*
- * @param result the result pointable that will hold the null value
+ * @param result the result pointable that will hold the null value
* @param pointable1 the pointable to be checked
- *
* @return {@code true} if the {@param pointable1} value is missing or
null, {@code false} otherwise.
*/
public static boolean checkAndSetNull(IPointable result, IPointable
pointable1) throws HyracksDataException {
@@ -331,10 +327,9 @@ public class PointableHelper {
* Checks whether any pointable argument is null or missing, and if true,
assigns null to the
* {@param result}.
*
- * @param result the result pointable that will hold the null value
+ * @param result the result pointable that will hold the null value
* @param pointable1 the pointable to be checked
* @param pointable2 the pointable to be checked
- *
* @return {@code true} if any pointable is missing or null, {@code false}
otherwise.
*/
public static boolean checkAndSetNull(IPointable result, IPointable
pointable1, IPointable pointable2)
@@ -344,14 +339,13 @@ public class PointableHelper {
/**
* This method checks and returns the pointable value state.
- *
+ * <p>
* If a ListAccessor is passed to this function, it will check if the
passed pointable is a list, and if so, it
* will search for a missing value inside the list before checking for
null values. If the listAccessor value is
* null, no list elements check will be performed.
*
- * @param pointable the pointable to be checked
+ * @param pointable the pointable to be checked
* @param listAccessor list accessor used to check the list elements.
- *
* @return the pointable value state for the passed pointable
*/
private static PointableValueState getPointableValueState(IPointable
pointable, ListAccessor listAccessor)
@@ -400,10 +394,9 @@ public class PointableHelper {
* Check if the provided bytes are of valid long type. In case floats and
doubles are accepted, the accepted
* values will be 1.0 and 2.0, but not 2.5. (only zero decimals)
*
- * @param bytes data bytes
- * @param startOffset start offset
+ * @param bytes data bytes
+ * @param startOffset start offset
* @param acceptFloatAndDouble flag to accept float and double values or
not
- *
* @return true if provided value is a valid long, false otherwise
*/
public static boolean isValidLongValue(byte[] bytes, int startOffset,
boolean acceptFloatAndDouble) {
@@ -443,4 +436,9 @@ public class PointableHelper {
return true;
}
+
+ public static boolean isNullOrMissing(IValueReference value) {
+ byte typeTag = value.getByteArray()[0];
+ return ATypeTag.MISSING.serialize() == typeTag ||
ATypeTag.NULL.serialize() == typeTag;
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
new file mode 100644
index 0000000000..88153dea92
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+abstract class AbstractPathResolver implements IPathResolver {
+ // TODO is 4-digits enough?
+ // TODO do we need jobId?
+ //4-digit format for the partition number, jobId, and file counter
+ private static final int NUMBER_OF_DIGITS = 4;
+ private static final String FILE_FORMAT = "%0" + NUMBER_OF_DIGITS + "d";
+
+ private final String fileExtension;
+ private final char fileSeparator;
+ private final int partition;
+ private final long jobId;
+ private final StringBuilder pathStringBuilder;
+ private int fileCounter;
+
+ AbstractPathResolver(String fileExtension, char fileSeparator, int
partition, long jobId) {
+ this.fileExtension = fileExtension;
+ this.fileSeparator = fileSeparator;
+ this.partition = partition;
+ this.jobId = jobId;
+ pathStringBuilder = new StringBuilder();
+ fileCounter = 0;
+ }
+
+ @Override
+ public final String getPartitionPath(IFrameTupleReference tuple) throws
HyracksDataException {
+ fileCounter = 0;
+ pathStringBuilder.setLength(0);
+ appendPrefix(pathStringBuilder, tuple);
+ if (pathStringBuilder.charAt(pathStringBuilder.length() - 1) !=
fileSeparator) {
+ pathStringBuilder.append(fileSeparator);
+ }
+ pathStringBuilder.append(String.format(FILE_FORMAT, partition));
+ pathStringBuilder.append('-');
+ pathStringBuilder.append(String.format(FILE_FORMAT, jobId));
+ pathStringBuilder.append('-');
+ pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+ if (fileExtension != null && !fileExtension.isEmpty()) {
+ pathStringBuilder.append('.');
+ pathStringBuilder.append(fileExtension);
+ }
+ return pathStringBuilder.toString();
+ }
+
+ @Override
+ public final String getNextPath() {
+ int numOfCharToRemove = NUMBER_OF_DIGITS;
+ if (fileExtension != null && !fileExtension.isEmpty()) {
+ numOfCharToRemove += 1 + fileExtension.length();
+ }
+ pathStringBuilder.setLength(pathStringBuilder.length() -
numOfCharToRemove);
+ pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+ if (fileExtension != null && !fileExtension.isEmpty()) {
+ pathStringBuilder.append('.');
+ pathStringBuilder.append(fileExtension);
+ }
+ return pathStringBuilder.toString();
+ }
+
+ abstract void appendPrefix(StringBuilder pathStringBuilder,
IFrameTupleReference tuple) throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
new file mode 100644
index 0000000000..dbd97e4320
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.UTFDataFormatException;
+
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.util.string.UTF8CharBuffer;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+final class DynamicPathResolver extends AbstractPathResolver {
+ private final IScalarEvaluator pathEval;
+ private final String inappropriatePartitionPath;
+ private final VoidPointable pathResult;
+ private final UTF8CharBuffer charBuffer;
+
+ DynamicPathResolver(String fileExtension, char fileSeparator, int
partition, long jobId, IScalarEvaluator pathEval,
+ String inappropriatePartitionPath, IWarningCollector
warningCollector) {
+ super(fileExtension, fileSeparator, partition, jobId);
+ this.pathEval = pathEval;
+ this.inappropriatePartitionPath = inappropriatePartitionPath;
+ pathResult = new VoidPointable();
+ charBuffer = new UTF8CharBuffer();
+ }
+
+ @Override
+ void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference
tuple) throws HyracksDataException {
+ pathEval.evaluate(tuple, pathResult);
+ if (PointableHelper.isNullOrMissing(pathResult)) {
+ // TODO warn
+ pathStringBuilder.append(inappropriatePartitionPath);
+ return;
+ }
+
+ try {
+ UTF8StringUtil.readUTF8(pathResult.getByteArray(),
pathResult.getStartOffset() + 1, charBuffer);
+ } catch (UTFDataFormatException e) {
+ throw HyracksDataException.create(e);
+ }
+ pathStringBuilder.append(charBuffer.getBuffer(), 0,
charBuffer.getFilledLength());
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
new file mode 100644
index 0000000000..a4af805a84
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class ExternalWriter implements IExternalWriter {
+ private final IPathResolver pathResolver;
+ private final IExternalFileWriter writer;
+ private final int maxResultPerFile;
+ private int tupleCounter;
+
+ public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter
writer, int maxResultPerFile) {
+ this.pathResolver = pathResolver;
+ this.writer = writer;
+ this.maxResultPerFile = maxResultPerFile;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void initNewPartition(IFrameTupleReference tuple) throws
HyracksDataException {
+ tupleCounter = 0;
+ writer.newFile(pathResolver.getPartitionPath(tuple));
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ writer.write(value);
+ tupleCounter++;
+ if (tupleCounter >= maxResultPerFile) {
+ tupleCounter = 0;
+ writer.newFile(pathResolver.getNextPath());
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ writer.abort();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
new file mode 100644
index 0000000000..350c5d6fdc
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class ExternalWriterFactory implements IExternalWriterFactory {
+ private static final long serialVersionUID = 1412969574113419638L;
+ private final IExternalFileWriterFactory writerFactory;
+ private final IExternalFilePrinterFactory printerFactory;
+ private final String fileExtension;
+ private final int maxResult;
+ private final IScalarEvaluatorFactory pathEvalFactory;
+ private final String inappropriatePartitionPath;
+ private final String staticPath;
+
+ public ExternalWriterFactory(IExternalFileWriterFactory writerFactory,
IExternalFilePrinterFactory printerFactory,
+ String fileExtension, int maxResult, IScalarEvaluatorFactory
pathEvalFactory,
+ String inappropriatePartitionPath, String staticPath) {
+ this.writerFactory = writerFactory;
+ this.printerFactory = printerFactory;
+ this.fileExtension = fileExtension;
+ this.maxResult = maxResult;
+ this.pathEvalFactory = pathEvalFactory;
+ this.inappropriatePartitionPath = inappropriatePartitionPath;
+ this.staticPath = staticPath;
+ }
+
+ @Override
+ public IExternalWriter createWriter(IHyracksTaskContext context) throws
HyracksDataException {
+ int partition = context.getTaskAttemptId().getTaskId().getPartition();
+ long jobId = context.getJobletContext().getJobId().getId();
+ char fileSeparator = writerFactory.getFileSeparator();
+ IPathResolver resolver;
+ if (staticPath == null) {
+ EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+ IScalarEvaluator pathEval =
pathEvalFactory.createScalarEvaluator(evaluatorContext);
+ IWarningCollector warningCollector = context.getWarningCollector();
+ resolver = new DynamicPathResolver(fileExtension, fileSeparator,
partition, jobId, pathEval,
+ inappropriatePartitionPath, warningCollector);
+ } else {
+ resolver = new StaticPathResolver(fileExtension, fileSeparator,
partition, jobId, staticPath);
+ }
+ IExternalFileWriter writer = writerFactory.createWriter(context,
printerFactory);
+ return new ExternalWriter(resolver, writer, maxResult);
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
index 5e49be3674..2de5e11454 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -16,30 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.runtime.writer;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.util.Map;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
-
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
-
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+@FunctionalInterface
+public interface IExternalFileFilterWriterFactoryProvider {
+ IExternalFileWriterFactory create(Map<String, String> configuration);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
index 5e49be3674..fb89f4a232 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -16,30 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.runtime.writer;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
+/**
+ * An {@link IExternalFileWriter} printer
+ */
+public interface IExternalFilePrinter {
+
+ /**
+ * Open the printer
+ */
+ void open() throws HyracksDataException;
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
+ /**
+ * Initialize the printer with a new stream
+ *
+ * @param outputStream to print to
+ */
+ void newStream(OutputStream outputStream) throws HyracksDataException;
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
+ /**
+ * Print the provided value
+ *
+ * @param value to print
+ */
+ void print(IValueReference value) throws HyracksDataException;
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+ /**
+ * Flush and close the printer
+ */
+ void close();
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
index 5e49be3674..a4fa97bc65 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
@@ -16,30 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.runtime.writer;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
-
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
-
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+/**
+ * {@link IExternalFileWriter} printer factory
+ */
+public interface IExternalFilePrinterFactory extends Serializable {
+ /**
+ * @return a new external file printer
+ */
+ IExternalFilePrinter createPrinter();
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
new file mode 100644
index 0000000000..c07e8cb571
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * A file writer
+ */
+public interface IExternalFileWriter {
+
+ /**
+ * Open the writer
+ */
+ void open() throws HyracksDataException;
+
+ /**
+ * Initialize the writer to write to a new path
+ *
+ * @param path of the file to writer (including the file name)
+ */
+ void newFile(String path) throws HyracksDataException;
+
+ /**
+ * Writer the provided value
+ *
+ * @param value to write
+ */
+ void write(IValueReference value) throws HyracksDataException;
+
+ /**
+ * Run the abort sequence in case of a failure
+ */
+ void abort() throws HyracksDataException;
+
+ /**
+ * Flush the final result and close the writer
+ */
+ void close() throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
new file mode 100644
index 0000000000..2ab6dac563
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for writing to a storage device
+ * Implementer should also provide a singleton to {@link
IExternalFileFilterWriterFactoryProvider}
+ */
+public interface IExternalFileWriterFactory extends Serializable {
+ /**
+ * Create a writer
+ *
+ * @param context task context
+ * @param printerFactory printer factory for writing the final result
+ * @return a new file writer
+ */
+ IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalFilePrinterFactory printerFactory)
+ throws HyracksDataException;
+
+ /**
+ * @return file (or path) separator
+ */
+ char getFileSeparator();
+
+ /**
+ * Validate the writer by running a test write routine to ensure the
writer has the appropriate permissions
+ */
+ void validate() throws AlgebricksException;
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
index 5e49be3674..2419210df5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -16,30 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.runtime.writer;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
+/**
+ * Path resolver which generates paths for the written files
+ */
+interface IPathResolver {
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
+ /**
+ * Extract the partitioning values from the provided tuple and generates
the file path
+ *
+ * @param tuple contains the partitioning values
+ * @return the final path which includes the partitioning values
+ */
+ String getPartitionPath(IFrameTupleReference tuple) throws
HyracksDataException;
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+ /**
+ * @return the path of the next file to be written for the same partition
+ */
+ String getNextPath();
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
similarity index 52%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
index 5e49be3674..34c6575868 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -16,30 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.asterix.runtime.writer;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+final class StaticPathResolver extends AbstractPathResolver {
+ private final String prefix;
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
-
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
+ StaticPathResolver(String fileExtension, char fileSeparator, int
partition, long jobId, String prefix) {
+ super(fileExtension, fileSeparator, partition, jobId);
+ this.prefix = prefix;
}
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
+ @Override
+ void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference
tuple) throws HyracksDataException {
+ pathStringBuilder.append(prefix);
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
new file mode 100644
index 0000000000..e96531fea6
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import java.nio.ByteBuffer;
+
+import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+ private final int sourceColumn;
+ private final int[] partitionColumns;
+ private final IPointable sourceValue;
+ private final PointableTupleReference partitionColumnsPrevCopy;
+ private final PermutingFrameTupleReference partitionColumnsRef;
+ private final IBinaryComparator[] partitionComparators;
+ private final IExternalWriter writer;
+ private FrameTupleAccessor tupleAccessor;
+ private FrameTupleReference tupleRef;
+ private boolean first;
+
+ SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns,
IBinaryComparator[] partitionComparators,
+ RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+ this.sourceColumn = sourceColumn;
+ this.partitionColumns = partitionColumns;
+ this.sourceValue = new VoidPointable();
+ partitionColumnsRef = new
PermutingFrameTupleReference(partitionColumns);
+ partitionColumnsPrevCopy =
+ PointableTupleReference.create(partitionColumns.length,
ArrayBackedValueStorage::new);
+ this.partitionComparators = partitionComparators;
+ this.inputRecordDesc = inputRecordDesc;
+ this.writer = writer;
+ first = true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (tupleAccessor == null) {
+ writer.open();
+ tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
+ tupleRef = new FrameTupleReference();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tupleAccessor.reset(buffer);
+ for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+ tupleRef.reset(tupleAccessor, i);
+ if (isNewPartition(i)) {
+ writer.initNewPartition(tupleRef);
+ }
+ setValue(tupleRef, sourceColumn, sourceValue);
+ writer.write(sourceValue);
+ partitionColumnsRef.reset(tupleAccessor, i);
+ partitionColumnsPrevCopy.set(partitionColumnsRef);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.abort();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ private boolean isNewPartition(int index) throws HyracksDataException {
+ if (first) {
+ first = false;
+ return true;
+ }
+
+ return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy,
tupleAccessor, index, partitionColumns,
+ partitionComparators);
+ }
+
+ private void setValue(IFrameTupleReference tuple, int column, IPointable
value) {
+ byte[] data = tuple.getFieldData(column);
+ int start = tuple.getFieldStart(column);
+ int length = tuple.getFieldLength(column);
+ value.set(data, start, length);
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
new file mode 100644
index 0000000000..6220dec444
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class SinkExternalWriterRuntimeFactory extends
AbstractPushRuntimeFactory {
+ private static final long serialVersionUID = -2215789207336628581L;
+ private final int sourceColumn;
+ private final int[] partitionColumn;
+ private final IBinaryComparatorFactory[] partitionComparatorFactories;
+ private final RecordDescriptor inputRecordDescriptor;
+ private final IExternalWriterFactory writerFactory;
+
+ public SinkExternalWriterRuntimeFactory(int sourceColumn, int[]
partitionColumn,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
RecordDescriptor inputRecordDescriptor,
+ IExternalWriterFactory writerFactory) {
+ this.sourceColumn = sourceColumn;
+ this.partitionColumn = partitionColumn;
+ this.partitionComparatorFactories = partitionComparatorFactories;
+ this.inputRecordDescriptor = inputRecordDescriptor;
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ IExternalWriter writer = writerFactory.createWriter(ctx);
+ IBinaryComparator[] partitionComparators = new
IBinaryComparator[partitionComparatorFactories.length];
+ for (int i = 0; i < partitionComparatorFactories.length; i++) {
+ partitionComparators[i] =
partitionComparatorFactories[i].createBinaryComparator();
+ }
+ SinkExternalWriterRuntime runtime = new
SinkExternalWriterRuntime(sourceColumn, partitionColumn,
+ partitionComparators, inputRecordDescriptor, writer);
+ return new IPushRuntime[] { runtime };
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
new file mode 100644
index 0000000000..81ed880425
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.writers;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * An external writer of a query (or dataset) result
+ */
+public interface IExternalWriter {
+ /**
+ * Open the writer
+ */
+ void open() throws HyracksDataException;
+
+ /**
+ * Initialize the writer for a new partition
+ *
+ * @param tuple which contains the partitioning columns
+ */
+ void initNewPartition(IFrameTupleReference tuple) throws
HyracksDataException;
+
+ /**
+ * Write the provided value
+ *
+ * @param value to be written
+ */
+ void write(IValueReference value) throws HyracksDataException;
+
+ /**
+ * Run the abort sequence in case of a failure
+ */
+ void abort() throws HyracksDataException;
+
+ /**
+ * Flush the final result and close the writer
+ */
+ void close() throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
index 5e49be3674..e6899a61b6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
@@ -16,30 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.hyracks.algebricks.runtime.writers;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
- }
-
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
- }
-
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
- }
- return writeBuffer;
- }
+/**
+ * A writer factory which creates a writer for the result of a query (or
dataset) to a storage device
+ */
+@FunctionalInterface
+public interface IExternalWriterFactory extends Serializable {
+ /**
+ * Crete a new writer
+ *
+ * @param context task context
+ * @return new writer
+ */
+ IExternalWriter createWriter(IHyracksTaskContext context) throws
HyracksDataException;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
similarity index 53%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
copy to
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
index 5e49be3674..30ebf69d60 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
@@ -16,30 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud;
+package org.apache.hyracks.util.string;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+public class UTF8CharBuffer {
+ private char[] buffer;
+ private int filledLength;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-public class WriteBufferProvider {
- private final BlockingQueue<ByteBuffer> writeBuffers;
-
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
+ public char[] getBuffer() {
+ return buffer;
}
- public void recycle(ByteBuffer buffer) {
- writeBuffers.offer(buffer);
+ public int getFilledLength() {
+ return filledLength;
}
- public ByteBuffer getBuffer() {
- ByteBuffer writeBuffer = writeBuffers.poll();
- if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
+ char[] getBuffer(int requiredLength) {
+ if (buffer == null || buffer.length < requiredLength) {
+ buffer = new char[requiredLength];
}
- return writeBuffer;
+
+ return buffer;
+ }
+
+ void setFilledLength(int filledLength) {
+ this.filledLength = filledLength;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index cde79cb95a..4d3aceddda 100644
---
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.lang.ref.SoftReference;
+import java.util.Objects;
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
@@ -535,14 +536,29 @@ public class UTF8StringUtil {
chararr = reader.chararr;
}
+ in.readFully(bytearr, 0, utflen);
+
+ int chararr_count = readUTF8(bytearr, 0, utflen, chararr);
+
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
+
+ public static void readUTF8(byte[] bytearr, int start, UTF8CharBuffer
buffer) throws UTFDataFormatException {
+ Objects.requireNonNull(buffer);
+ int utflen = VarLenIntEncoderDecoder.decode(bytearr, start);
+ int lengthIntSize = VarLenIntEncoderDecoder.getBytesRequired(utflen);
+ char[] chararr = buffer.getBuffer(utflen);
+ buffer.setFilledLength(readUTF8(bytearr, start + lengthIntSize,
utflen, chararr));
+ }
+
+ private static int readUTF8(byte[] bytearr, int start, int utflen, char[]
chararr) throws UTFDataFormatException {
int c, char2, char3;
int count = 0;
int chararr_count = 0;
- in.readFully(bytearr, 0, utflen);
-
while (count < utflen) {
- c = bytearr[count] & 0xff;
+ c = bytearr[start + count] & 0xff;
if (c > 127) {
break;
}
@@ -596,8 +612,7 @@ public class UTF8StringUtil {
throw new UTFDataFormatException("malformed input around
byte " + count);
}
}
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararr_count);
+ return chararr_count;
}
/**