This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new de7c56a756 [ASTERIXDB-3353][EXT] Support reading avro from localfs. de7c56a756 is described below commit de7c56a7563e0eb94b89321abefb19a55a849cd0 Author: ayush-couchbase <ayush.tripa...@couchbase.com> AuthorDate: Tue Feb 13 16:13:42 2024 +0530 [ASTERIXDB-3353][EXT] Support reading avro from localfs. - user model changes: no - storage format changes: no - interface changes: yes Details: - Read avro files from local storage. Change-Id: Ibdacf4e6b156a3b6ef15b4420f4102c122f8bf1e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18153 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Wail Alkowaileet <wael....@gmail.com> --- .../asterix/external/api/IRecordReaderFactory.java | 3 +- .../reader/stream/AbstractStreamRecordReader.java | 59 +++++++ .../record/reader/stream/AvroRecordReader.java | 145 ++++++++++++++++++ .../record/reader/stream/StreamRecordReader.java | 38 +---- .../reader/stream/StreamRecordReaderFactory.java | 30 ++-- .../stream/DiscretizedMultipleInputStream.java | 127 +++++++++++++++ .../asterix/external/parser/AvroDataParser.java | 170 +++++++++++++++++++++ ....java => AbstractGenericDataParserFactory.java} | 54 +------ .../parser/factory/AvroDataParserFactory.java | 68 +++++++++ .../parser/factory/JSONDataParserFactory.java | 49 +----- .../provider/StreamRecordReaderProvider.java | 18 ++- .../context/ExternalReaderRuntimeDataContext.java | 6 +- .../external/util/ExternalDataConstants.java | 3 +- ....apache.asterix.external.api.IDataParserFactory | 3 +- 14 files changed, 621 insertions(+), 152 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java index 64be02e390..4708f52743 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java @@ -21,6 +21,7 @@ package org.apache.asterix.external.api; import java.util.List; import java.util.Set; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -28,7 +29,7 @@ public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory { IRecordReader<? extends T> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException; - Class<?> getRecordClass(); + Class<?> getRecordClass() throws AsterixException; @Override default DataSourceType getDataSourceType() { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java new file mode 100644 index 0000000000..eb1d754f62 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.stream; + +import static org.apache.asterix.external.util.ExternalDataConstants.EMPTY_STRING; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_REDACT_WARNINGS; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public abstract class AbstractStreamRecordReader<T> implements IRecordReader<T> { + private Supplier<String> dataSourceName = EMPTY_STRING; + private Supplier<String> previousDataSourceName = EMPTY_STRING; + + protected final void setSuppliers(Map<String, String> config, Supplier<String> dataSourceName, + Supplier<String> previousDataSourceName) { + if (!ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) { + this.dataSourceName = dataSourceName; + this.previousDataSourceName = previousDataSourceName; + } + } + + @Override + public final Supplier<String> getDataSourceName() { + return dataSourceName; + } + + protected final String getPreviousStreamName() { + return previousDataSourceName.get(); + } + + public abstract List<String> getRecordReaderFormats(); + + public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) + throws HyracksDataException; +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java new file mode 100644 index 0000000000..e048890165 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.stream; + +import static org.apache.asterix.external.util.ExternalDataConstants.EMPTY_STRING; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_REDACT_WARNINGS; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; +import org.apache.asterix.external.input.stream.DiscretizedMultipleInputStream; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.IFeedLogManager; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class AvroRecordReader extends AbstractStreamRecordReader<GenericRecord> { + private final org.apache.asterix.external.input.record.GenericRecord<GenericRecord> record; + private final DiscretizedMultipleInputStream inputStream; + private final Supplier<String> dataSourceName; + private GenericRecord avroRecord; + private DataFileStream<GenericRecord> dataFileStream; + private boolean done; + private static final List<String> recordReaderFormats = + Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_AVRO)); + + public AvroRecordReader(AsterixInputStream inputStream, Map<String, String> config) throws IOException { + record = new org.apache.asterix.external.input.record.GenericRecord<>(); + this.inputStream = new DiscretizedMultipleInputStream(inputStream); + done = false; + if (ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) { + dataSourceName = EMPTY_STRING; + } else { + dataSourceName = inputStream::getStreamName; + } + + advance(); + } + + @Override + public void close() throws IOException { + try { + if (!done) { + inputStream.close(); + } + } finally { + done = true; + } + } + + @Override + public boolean stop() { + try { + inputStream.stop(); + return true; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + + @Override + public IRawRecord<GenericRecord> next() throws IOException { + avroRecord = dataFileStream.next(avroRecord); + record.set(avroRecord); + return record; + } + + @Override + public boolean hasNext() throws IOException { + if (dataFileStream == null) { + return false; + } + if (dataFileStream.hasNext()) { + return true; + } + return advance() && dataFileStream.hasNext(); + } + + @Override + public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException { + inputStream.setFeedLogManager(feedLogManager); + } + + @Override + public void setController(AbstractFeedDataFlowController controller) { + inputStream.setController(controller); + } + + @Override + public boolean handleException(Throwable th) { + return inputStream.handleException(th); + } + + @Override + public List<String> getRecordReaderFormats() { + return recordReaderFormats; + + } + + @Override + public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) + throws HyracksDataException { + + } + + private boolean advance() throws IOException { + if (inputStream.advance()) { + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + dataFileStream = new DataFileStream<>(inputStream, datumReader); + return true; + } + + return false; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java index 937f3feb0b..5ef63bc61f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java @@ -18,27 +18,21 @@ */ package org.apache.asterix.external.input.record.reader.stream; -import static org.apache.asterix.external.util.ExternalDataConstants.EMPTY_STRING; -import static org.apache.asterix.external.util.ExternalDataConstants.KEY_REDACT_WARNINGS; - import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.function.Supplier; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IRawRecord; -import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IStreamNotificationHandler; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.CharArrayRecord; import org.apache.asterix.external.input.stream.AsterixInputStreamReader; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.IFeedLogManager; -import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler { +public abstract class StreamRecordReader extends AbstractStreamRecordReader<char[]> + implements IStreamNotificationHandler { protected AsterixInputStreamReader reader; protected CharArrayRecord record; protected char[] inputBuffer; @@ -46,18 +40,13 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre protected int bufferPosn = 0; protected boolean done = false; protected IFeedLogManager feedLogManager; - private Supplier<String> dataSourceName = EMPTY_STRING; - private Supplier<String> previousDataSourceName = EMPTY_STRING; - public void configure(AsterixInputStream inputStream, Map<String, String> config) { + protected void configure(AsterixInputStream inputStream, Map<String, String> config) { int bufferSize = ExternalDataUtils.getOrDefaultBufferSize(config); this.reader = new AsterixInputStreamReader(inputStream, bufferSize); record = new CharArrayRecord(); inputBuffer = new char[bufferSize]; - if (!ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) { - this.dataSourceName = reader::getStreamName; - this.previousDataSourceName = reader::getPreviousStreamName; - } + setSuppliers(config, reader::getStreamName, reader::getPreviousStreamName); } @Override @@ -87,9 +76,6 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre } } - @Override - public abstract boolean hasNext() throws IOException; - @Override public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException { this.feedLogManager = feedLogManager; @@ -115,19 +101,5 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre record.reset(); } - @Override - public final Supplier<String> getDataSourceName() { - return dataSourceName; - } - - String getPreviousStreamName() { - return previousDataSourceName.get(); - } - - public abstract List<String> getRecordReaderFormats(); - public abstract String getRequiredConfigs(); - - public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) - throws HyracksDataException; -} +} \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java index d8168ced73..985e2b5d29 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java @@ -18,12 +18,14 @@ */ package org.apache.asterix.external.input.record.reader.stream; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; @@ -45,12 +47,12 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; -public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { +public class StreamRecordReaderFactory implements IRecordReaderFactory<Object> { private static final long serialVersionUID = 1L; protected IInputStreamFactory streamFactory; protected Map<String, String> configuration; - protected Class recordReaderClazz; + protected Class<?> recordReaderClazz; protected IExternalFilterEvaluatorFactory filterEvaluatorFactory; private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, @@ -63,8 +65,8 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { } @Override - public final Class<?> getRecordClass() { - return char[].class; + public final Class<?> getRecordClass() throws AsterixException { + return StreamRecordReaderProvider.getRecordClass(configuration); } @Override @@ -84,9 +86,8 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { } @Override - public final IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context) - throws HyracksDataException { - StreamRecordReader reader = createReader(context); + public final IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { + AbstractStreamRecordReader<?> reader = createReader(context); ((ExternalReaderRuntimeDataContext) context).setReader(reader); return reader; } @@ -119,16 +120,23 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { } @SuppressWarnings("unchecked") - private StreamRecordReader createReader(IExternalDataRuntimeContext context) throws HyracksDataException { + private AbstractStreamRecordReader<?> createReader(IExternalDataRuntimeContext context) + throws HyracksDataException { try { - StreamRecordReader streamRecordReader = - (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); + AbstractStreamRecordReader<?> streamRecordReader; + if (recordReaderClazz.equals(AvroRecordReader.class)) { + streamRecordReader = new AvroRecordReader(streamFactory.createInputStream(context), configuration); + } else { + streamRecordReader = (AbstractStreamRecordReader<?>) recordReaderClazz.getConstructor().newInstance(); + } streamRecordReader.configure(context.getTaskContext(), streamFactory.createInputStream(context), configuration); return streamRecordReader; } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw HyracksDataException.create(e); + } catch (IOException e) { + throw new RuntimeException(e); } } -} +} \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/DiscretizedMultipleInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/DiscretizedMultipleInputStream.java new file mode 100644 index 0000000000..38b9ae736b --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/DiscretizedMultipleInputStream.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.stream; + +import java.io.IOException; + +import org.apache.asterix.external.api.AsterixInputStream; + +public class DiscretizedMultipleInputStream extends AsterixInputStream { + private final IStreamWrapper stream; + + public DiscretizedMultipleInputStream(AsterixInputStream inputStream) { + if (inputStream instanceof AbstractMultipleInputStream) { + AbstractMultipleInputStream multipleInputStream = (AbstractMultipleInputStream) inputStream; + stream = new MultipleStreamWrapper(multipleInputStream); + } else { + stream = new SingleStreamWrapper(inputStream); + } + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + public boolean advance() throws IOException { + return stream.advance(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public boolean stop() throws Exception { + return stream.getInputStream().stop(); + } + + @Override + public boolean handleException(Throwable th) { + return stream.getInputStream().handleException(th); + } + + private interface IStreamWrapper { + boolean advance() throws IOException; + + int read() throws IOException; + + int read(byte[] b, int off, int len) throws IOException; + + AsterixInputStream getInputStream(); + } + + private static class MultipleStreamWrapper implements IStreamWrapper { + private final AbstractMultipleInputStream inputStream; + + private MultipleStreamWrapper(AbstractMultipleInputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public boolean advance() throws IOException { + return inputStream.advance(); + } + + @Override + public int read() throws IOException { + return inputStream.in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStream.in.read(b, off, len); + } + + @Override + public AsterixInputStream getInputStream() { + return inputStream; + } + } + + private static class SingleStreamWrapper implements IStreamWrapper { + private final AsterixInputStream inputStream; + + private SingleStreamWrapper(AsterixInputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public boolean advance() { + return false; + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); + } + + @Override + public AsterixInputStream getInputStream() { + return inputStream; + } + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java new file mode 100644 index 0000000000..d1744ebe08 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.parser; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.apache.asterix.builders.IARecordBuilder; +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.external.api.IExternalDataRuntimeContext; +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.api.IRecordDataParser; +import org.apache.asterix.external.parser.jackson.ParserContext; +import org.apache.asterix.om.base.ABoolean; +import org.apache.asterix.om.base.ANull; +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IMutableValueStorage; + +public class AvroDataParser extends AbstractDataParser implements IRecordDataParser<GenericRecord> { + private final ParserContext parserContext; + + public AvroDataParser(IExternalDataRuntimeContext context) { + parserContext = new ParserContext(); + } + + @Override + public boolean parse(IRawRecord<? extends GenericRecord> record, DataOutput out) throws HyracksDataException { + try { + parseObject(record.get(), out); + return true; + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + private final void parseObject(GenericRecord record, DataOutput out) throws IOException { + Schema schema = record.getSchema(); + IMutableValueStorage valueBuffer = parserContext.enterObject(); + IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE); + for (Schema.Field field : schema.getFields()) { + valueBuffer.reset(); + parseValue(field.schema(), record.get(field.name()), valueBuffer.getDataOutput()); + objectBuilder.addField(parserContext.getSerializedFieldName(field.name()), valueBuffer); + } + objectBuilder.write(out, true); + parserContext.exitObject(valueBuffer, null, objectBuilder); + } + + private final void parseArray(Schema arraySchema, Collection<?> elements, DataOutput out) throws IOException { + Schema elementSchema = arraySchema.getElementType(); + final IMutableValueStorage valueBuffer = parserContext.enterCollection(); + final IAsterixListBuilder arrayBuilder = + parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE); + for (Object element : elements) { + valueBuffer.reset(); + parseValue(elementSchema, element, valueBuffer.getDataOutput()); + arrayBuilder.addItem(valueBuffer); + } + arrayBuilder.write(out, true); + parserContext.exitCollection(valueBuffer, arrayBuilder); + } + + private void parseValue(Schema schema, Object value, DataOutput out) throws IOException { + Schema.Type type = schema.getType(); + switch (type) { + case RECORD: + parseObject((GenericRecord) value, out); + break; + case ARRAY: + parseArray(schema, (Collection<?>) value, out); + break; + case MAP: + case UNION: + case ENUM: + case FIXED: + case NULL: + nullSerde.serialize(ANull.NULL, out); + break; + case INT: + case LONG: + case FLOAT: + case DOUBLE: + serializeNumeric(value, type, out); + break; + case STRING: + serializeString(value, type, out); + break; + case BYTES: + serializeBinary(value, type, out); + break; + case BOOLEAN: + if ((Boolean) value) { + booleanSerde.serialize(ABoolean.TRUE, out); + } else { + booleanSerde.serialize(ABoolean.FALSE, out); + } + break; + default: + throw new RuntimeDataException(ErrorCode.PARSE_ERROR, value.toString()); + } + } + + private void serializeNumeric(Object value, Schema.Type type, DataOutput out) throws IOException { + switch (type) { + case INT: + aInt32.setValue((Integer) value); + int32Serde.serialize(aInt32, out); + break; + case LONG: + aInt64.setValue((Long) value); + int64Serde.serialize(aInt64, out); + break; + case FLOAT: + aFloat.setValue((Float) value); + floatSerde.serialize(aFloat, out); + break; + case DOUBLE: + aDouble.setValue((Double) value); + doubleSerde.serialize(aDouble, out); + break; + default: + throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Error"); + } + } + + private void serializeString(Object value, Schema.Type type, DataOutput out) throws IOException { + switch (type) { + case STRING: + aString.setValue(value.toString()); + stringSerde.serialize(aString, out); + break; + default: + throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Error"); + } + } + + private void serializeBinary(Object value, Schema.Type type, DataOutput out) throws IOException { + switch (type) { + case BYTES: + aBinary.setValue(((ByteBuffer) value).array(), 0, ((ByteBuffer) value).array().length); + binarySerde.serialize(aBinary, out); + break; + default: + throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Error"); + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractGenericDataParserFactory.java similarity index 60% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractGenericDataParserFactory.java index 68890678db..d2aa4ea552 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractGenericDataParserFactory.java @@ -24,70 +24,18 @@ import java.util.List; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.api.IExternalDataRuntimeContext; -import org.apache.asterix.external.api.IRecordDataParser; -import org.apache.asterix.external.api.IStreamDataParser; -import org.apache.asterix.external.parser.JSONDataParser; -import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.IAType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; +public abstract class AbstractGenericDataParserFactory<T> extends AbstractRecordStreamParserFactory<T> { -public class JSONDataParserFactory extends AbstractRecordStreamParserFactory<char[]> { - - private static final long serialVersionUID = 1L; - private static final List<String> PARSER_FORMAT = Collections.unmodifiableList( - Arrays.asList(ExternalDataConstants.FORMAT_JSON_LOWER_CASE, ExternalDataConstants.FORMAT_JSON_UPPER_CASE)); private static final List<ATypeTag> UNSUPPORTED_TYPES = Collections .unmodifiableList(Arrays.asList(ATypeTag.MULTISET, ATypeTag.POINT3D, ATypeTag.CIRCLE, ATypeTag.RECTANGLE, ATypeTag.INTERVAL, ATypeTag.DAYTIMEDURATION, ATypeTag.DURATION, ATypeTag.BINARY)); - private final JsonFactory jsonFactory; - - public JSONDataParserFactory() { - jsonFactory = new JsonFactory(); - jsonFactory.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); - jsonFactory.configure(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES, true); - jsonFactory.configure(JsonFactory.Feature.INTERN_FIELD_NAMES, true); - } - - @Override - public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) { - return createParser(context); - } - - @Override - public void setMetaType(ARecordType metaType) { - // no MetaType to set. - } - - @Override - public List<String> getParserFormats() { - return PARSER_FORMAT; - } - - @Override - public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) { - return createParser(context); - } - - @Override - public Class<?> getRecordClass() { - return char[].class; - } - - private JSONDataParser createParser(IExternalDataRuntimeContext context) { - return new JSONDataParser(recordType, jsonFactory, context); - } - - /* - * check type compatibility before creating the parser. - */ @Override public void setRecordType(ARecordType recordType) throws AsterixException { checkRecordTypeCompatibility(recordType); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java new file mode 100644 index 0000000000..d1d20419f5 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.parser.factory; + +import java.util.List; + +import org.apache.asterix.external.api.IExternalDataRuntimeContext; +import org.apache.asterix.external.api.IRecordDataParser; +import org.apache.asterix.external.api.IStreamDataParser; +import org.apache.asterix.external.parser.AvroDataParser; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.om.types.ARecordType; +import org.apache.avro.generic.GenericRecord; + +public class AvroDataParserFactory extends AbstractGenericDataParserFactory<GenericRecord> { + + private static final long serialVersionUID = 1L; + private static final List<String> PARSER_FORMAT = List.of(ExternalDataConstants.FORMAT_AVRO); + + public AvroDataParserFactory() { + } + + @Override + public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) { + throw new UnsupportedOperationException("jkghdfkgd"); + } + + @Override + public void setMetaType(ARecordType metaType) { + // no MetaType to set. + } + + @Override + public List<String> getParserFormats() { + return PARSER_FORMAT; + } + + @Override + public IRecordDataParser<GenericRecord> createRecordParser(IExternalDataRuntimeContext context) { + return createParser(context); + } + + @Override + public Class<?> getRecordClass() { + return GenericRecord.class; + } + + private AvroDataParser createParser(IExternalDataRuntimeContext context) { + return new AvroDataParser(context); + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java index 68890678db..7eb87fb263 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java @@ -22,31 +22,21 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.IExternalDataRuntimeContext; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IStreamDataParser; import org.apache.asterix.external.parser.JSONDataParser; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AUnionType; -import org.apache.asterix.om.types.IAType; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; -public class JSONDataParserFactory extends AbstractRecordStreamParserFactory<char[]> { +public class JSONDataParserFactory extends AbstractGenericDataParserFactory<char[]> { private static final long serialVersionUID = 1L; private static final List<String> PARSER_FORMAT = Collections.unmodifiableList( Arrays.asList(ExternalDataConstants.FORMAT_JSON_LOWER_CASE, ExternalDataConstants.FORMAT_JSON_UPPER_CASE)); - private static final List<ATypeTag> UNSUPPORTED_TYPES = Collections - .unmodifiableList(Arrays.asList(ATypeTag.MULTISET, ATypeTag.POINT3D, ATypeTag.CIRCLE, ATypeTag.RECTANGLE, - ATypeTag.INTERVAL, ATypeTag.DAYTIMEDURATION, ATypeTag.DURATION, ATypeTag.BINARY)); - private final JsonFactory jsonFactory; public JSONDataParserFactory() { @@ -85,41 +75,4 @@ public class JSONDataParserFactory extends AbstractRecordStreamParserFactory<cha return new JSONDataParser(recordType, jsonFactory, context); } - /* - * check type compatibility before creating the parser. - */ - @Override - public void setRecordType(ARecordType recordType) throws AsterixException { - checkRecordTypeCompatibility(recordType); - super.setRecordType(recordType); - } - - /** - * Check if the defined type contains ADM special types. - * if it contains unsupported types. - * - * @param recordType - * @throws AsterixException - */ - private void checkRecordTypeCompatibility(ARecordType recordType) throws AsterixException { - final IAType[] fieldTypes = recordType.getFieldTypes(); - for (IAType type : fieldTypes) { - checkTypeCompatibility(type); - } - } - - private void checkTypeCompatibility(IAType type) throws AsterixException { - if (UNSUPPORTED_TYPES.contains(type.getTypeTag())) { - throw new AsterixException(ErrorCode.TYPE_UNSUPPORTED, JSONDataParserFactory.class.getName(), - type.getTypeTag().toString()); - } else if (type.getTypeTag() == ATypeTag.ARRAY) { - checkTypeCompatibility(((AOrderedListType) type).getItemType()); - } else if (type.getTypeTag() == ATypeTag.OBJECT) { - checkRecordTypeCompatibility((ARecordType) type); - } else if (type.getTypeTag() == ATypeTag.UNION) { - checkTypeCompatibility(((AUnionType) type).getActualType()); - } - //Compatible type - } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java index 4165fa8090..97b04a4650 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java @@ -32,8 +32,10 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.input.record.reader.stream.AvroRecordReader; import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; @@ -55,6 +57,18 @@ public class StreamRecordReaderProvider { // do nothing } + public static Class<?> getRecordClass(Map<String, String> configuration) throws AsterixException { + String format = configuration.get(ExternalDataConstants.KEY_FORMAT); + if (format == null) { + throw new AsterixException("Unspecified parameter: " + ExternalDataConstants.KEY_FORMAT); + } + if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_AVRO)) { + return GenericRecord.class; + } + // By default, return char[] + return char[].class; + } + public static Class findRecordReaderClazzWithConfig(Map<String, String> configuration, String format) throws AsterixException { List<Pair<String[], Class>> requiredConfigs = recordReaders.get(format); @@ -90,12 +104,14 @@ public class StreamRecordReaderProvider { public static Class getRecordReaderClazz(Map<String, String> configuration) throws AsterixException { String format = configuration.get(ExternalDataConstants.KEY_FORMAT); - if (recordReaders == null) { recordReaders = initRecordReaders(); } if (format != null) { + if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_AVRO)) { + return AvroRecordReader.class; + } if (recordReaders.containsKey(format)) { return findRecordReaderClazzWithConfig(configuration, format); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java index 587fc3ee69..3fe3625fbd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java @@ -21,13 +21,13 @@ package org.apache.asterix.external.provider.context; import java.util.function.LongSupplier; import java.util.function.Supplier; +import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; -import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; import org.apache.hyracks.api.context.IHyracksTaskContext; public class ExternalReaderRuntimeDataContext extends ExternalStreamRuntimeDataContext { private final IExternalFilterValueEmbedder valueEmbedder; - private StreamRecordReader reader; + private IRecordReader<?> reader; public ExternalReaderRuntimeDataContext(IHyracksTaskContext context, int partition, IExternalFilterValueEmbedder valueEmbedder) { @@ -59,7 +59,7 @@ public class ExternalReaderRuntimeDataContext extends ExternalStreamRuntimeDataC return valueEmbedder; } - public void setReader(StreamRecordReader reader) { + public void setReader(IRecordReader<?> reader) { this.reader = reader; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 7c30b64425..c0af89fa65 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -190,6 +190,7 @@ public class ExternalDataConstants { */ public static final String FORMAT_BINARY = "binary"; public static final String FORMAT_ADM = "adm"; + public static final String FORMAT_AVRO = "avro"; public static final String FORMAT_JSON_LOWER_CASE = "json"; public static final String FORMAT_JSON_UPPER_CASE = "JSON"; public static final String FORMAT_DELIMITED_TEXT = "delimited-text"; @@ -210,7 +211,7 @@ public class ExternalDataConstants { static { ALL_FORMATS = Set.of(FORMAT_BINARY, FORMAT_ADM, FORMAT_JSON_LOWER_CASE, FORMAT_DELIMITED_TEXT, FORMAT_TWEET, FORMAT_RSS, FORMAT_SEMISTRUCTURED, FORMAT_LINE_SEPARATED, FORMAT_HDFS_WRITABLE, FORMAT_KV, FORMAT_CSV, - FORMAT_TSV, FORMAT_PARQUET); + FORMAT_TSV, FORMAT_PARQUET, FORMAT_AVRO); TEXTUAL_FORMATS = Set.of(FORMAT_ADM, FORMAT_JSON_LOWER_CASE, FORMAT_CSV, FORMAT_TSV); } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory index 95cfd13ab6..bbf5195f86 100644 --- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory @@ -21,4 +21,5 @@ org.apache.asterix.external.parser.factory.DelimitedDataParserFactory org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory org.apache.asterix.external.parser.factory.RSSParserFactory org.apache.asterix.external.parser.factory.TweetParserFactory -org.apache.asterix.external.parser.factory.NoOpDataParserFactory \ No newline at end of file +org.apache.asterix.external.parser.factory.NoOpDataParserFactory +org.apache.asterix.external.parser.factory.AvroDataParserFactory \ No newline at end of file