This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-34 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 6e1233f87cf3399cc61946966d85f2561c4bcfbd Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Sep 27 13:57:26 2021 +0200 [WAYANG-34] add the ObjectFileSource to the basic and platforms Signed-off-by: bertty <[email protected]> --- .../scala/org/apache/wayang/api/PlanBuilder.scala | 11 +- .../wayang/basic/operators/ObjectFileSource.java | 197 +++++++++++++++++++++ .../org/apache/wayang/flink/mapping/Mappings.java | 1 + .../flink/mapping/ObjectFileSourceMapping.java | 67 +++++++ .../flink/operators/FlinkObjectFileSource.java | 16 +- .../org/apache/wayang/java/mapping/Mappings.java | 1 + .../java/mapping/ObjectFileSourceMapping.java | 66 +++++++ .../java/operators/JavaObjectFileSource.java | 19 +- .../org/apache/wayang/spark/mapping/Mappings.java | 1 + .../spark/mapping/ObjectFileSourceMapping.java | 68 +++++++ .../spark/operators/SparkObjectFileSource.java | 16 +- 11 files changed, 439 insertions(+), 24 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/PlanBuilder.scala index 9056401..f391bed 100644 --- a/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -24,7 +24,7 @@ package org.apache.wayang.api import org.apache.commons.lang3.Validate import org.apache.wayang.api import org.apache.wayang.basic.data.Record -import org.apache.wayang.basic.operators.{CollectionSource, TableSource, TextFileSource} +import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource} import org.apache.wayang.commons.util.profiledb.model.Experiment import org.apache.wayang.core.api.WayangContext import org.apache.wayang.core.plan.wayangplan._ @@ -114,6 +114,15 @@ class PlanBuilder(wayangContext: WayangContext, private var jobName: String = nu */ def readTextFile(url: String): DataQuanta[String] = load(new TextFileSource(url)) + + /** + * Read a object's file and provide it as a dataset of [[Object]]s. + * + * @param url the URL of the Object's file + * @return [[DataQuanta]] representing the file + */ + def readObjectFile[T: ClassTag](url: String): DataQuanta[T] = load(new ObjectFileSource(url, dataSetType[T])) + /** * Reads a database table and provides them as a dataset of [[Record]]s. * diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java new file mode 100644 index 0000000..7ca3410 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import org.apache.commons.lang3.Validate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; +import org.apache.wayang.core.plan.wayangplan.UnarySource; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.fs.FileSystems; + +/** + * This source reads a text file and outputs the lines as data units. + */ +public class ObjectFileSource<T> extends UnarySource<T> { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final String inputUrl; + + private final Class<T> tClass; + + public ObjectFileSource(String inputUrl, DataSetType<T> type) { + super(type); + this.inputUrl = inputUrl; + this.tClass = type.getDataUnitType().getTypeClass(); + } + + public ObjectFileSource(String inputUrl, Class<T> tClass) { + super(DataSetType.createDefault(tClass)); + this.inputUrl = inputUrl; + this.tClass = tClass; + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public ObjectFileSource(ObjectFileSource that) { + super(that); + this.inputUrl = that.getInputUrl(); + this.tClass = that.getTypeClass(); + } + + public String getInputUrl() { + return this.inputUrl; + } + + public Class<T> getTypeClass(){ + return this.tClass; + } + + @Override + public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator( + final int outputIndex, + final Configuration configuration) { + Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex); + return Optional.of(new ObjectFileSource.CardinalityEstimator()); + } + + + /** + * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s. + */ + protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator { + + public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7); + + public static final double CORRECTNESS_PROBABILITY = 0.95d; + + /** + * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}. + */ + public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05; + + @Override + public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) { + //TODO validate if the implementation apply for the case + Validate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length); + + // see Job for StopWatch measurements + final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start( + "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities" + ); + + // Query the job cache first to see if there is already an estimate. + String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl); + CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class); + if (cardinalityEstimate != null) return cardinalityEstimate; + + // Otherwise calculate the cardinality. + // First, inspect the size of the file and its line sizes. + OptionalLong fileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl); + if (!fileSize.isPresent()) { + ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.", + ObjectFileSource.this.inputUrl); + timeMeasurement.stop(); + return this.FALLBACK_ESTIMATE; + + } else if (fileSize.getAsLong() == 0L) { + timeMeasurement.stop(); + return new CardinalityEstimate(0L, 0L, 1d); + } + + OptionalDouble bytesPerLine = this.estimateBytesPerLine(); + if (!bytesPerLine.isPresent()) { + ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.", + ObjectFileSource.this.inputUrl); + timeMeasurement.stop(); + return this.FALLBACK_ESTIMATE; + } + + // Extrapolate a cardinality estimate for the complete file. + double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble(); + double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION; + cardinalityEstimate = new CardinalityEstimate( + (long) (numEstimatedLines - expectedDeviation), + (long) (numEstimatedLines + expectedDeviation), + CORRECTNESS_PROBABILITY + ); + + // Cache the result, so that it will not be recalculated again. + optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate); + + timeMeasurement.stop(); + return cardinalityEstimate; + } + + /** + * Estimate the number of bytes that are in each line of a given file. + * + * @return the average number of bytes per line if it could be determined + */ + private OptionalDouble estimateBytesPerLine() { + //TODO validate if the implementation apply for the case +// final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl); +// if (fileSystem.isPresent()) { +// +// // Construct a limited reader for the first x KiB of the file. +// final int KiB = 1024; +// final int MiB = 1024 * KiB; +// try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open( +// ObjectFileSource.this.inputUrl), 1 * MiB)) { +// final BufferedReader bufferedReader = new BufferedReader( +// new InputStreamReader(lis, ObjectFileSource.this.encoding) +// ); +// +// // Read as much as possible. +// char[] cbuf = new char[1024]; +// int numReadChars, numLineFeeds = 0; +// while ((numReadChars = bufferedReader.read(cbuf)) != -1) { +// for (int i = 0; i < numReadChars; i++) { +// if (cbuf[i] == '\n') { +// numLineFeeds++; +// } +// } +// } +// +// if (numLineFeeds == 0) { +// ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl); +// return OptionalDouble.empty(); +// } +// return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds); +// } catch (IOException e) { +// ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e); +// } +// } + + return OptionalDouble.empty(); + } + } + +} diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java index 3a89d6c..9a31979 100644 --- a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java +++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java @@ -55,6 +55,7 @@ public class Mappings { new TextFileSinkMapping(), new ObjectFileSinkMapping(), new TextFileSourceMapping(), + new ObjectFileSourceMapping(), new UnionAllMapping(), new ZipWithIdMapping() ); diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java new file mode 100644 index 0000000..85d67e9 --- /dev/null +++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.flink.mapping; + +import java.util.Collection; +import java.util.Collections; +import org.apache.wayang.basic.operators.ObjectFileSource; +import org.apache.wayang.basic.operators.TextFileSource; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.flink.operators.FlinkObjectFileSource; +import org.apache.wayang.flink.operators.FlinkTextFileSource; +import org.apache.wayang.flink.platform.FlinkPlatform; + +/** + * Mapping from {@link ObjectFileSource} to {@link FlinkObjectFileSource}. + */ +@SuppressWarnings("unchecked") +public class ObjectFileSourceMapping implements Mapping { + @Override + public Collection<PlanTransformation> getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + FlinkPlatform.getInstance() + )); + } + + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern( + "source", + new ObjectFileSource<>( + null, + DataSetType.none().getDataUnitType().getTypeClass() + ), + false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>( + (matchedOperator, epoch) -> new FlinkObjectFileSource<>(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java index fb1e569..9c304a4 100644 --- a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java +++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java @@ -27,6 +27,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.wayang.basic.channels.FileChannel; import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.operators.ObjectFileSource; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; import org.apache.wayang.core.plan.wayangplan.Operator; @@ -52,17 +53,18 @@ import java.util.List; * * @see FlinkObjectFileSource */ -public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements FlinkExecutionOperator { +public class FlinkObjectFileSource<Type> extends ObjectFileSource<Type> implements FlinkExecutionOperator { - private final String sourcePath; + public FlinkObjectFileSource(ObjectFileSource<Type> that) { + super(that); + } public FlinkObjectFileSource(DataSetType<Type> type) { this(null, type); } public FlinkObjectFileSource(String sourcePath, DataSetType<Type> type) { - super(type); - this.sourcePath = sourcePath; + super(sourcePath, type); } @Override @@ -76,12 +78,12 @@ public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements Fl assert outputs.length == this.getNumOutputs(); final String path; - if (this.sourcePath == null) { + if (this.getInputUrl() == null) { final FileChannel.Instance input = (FileChannel.Instance) inputs[0]; path = input.getSinglePath(); } else { assert inputs.length == 0; - path = this.sourcePath; + path = this.getInputUrl(); } DataSetChannel.Instance output = (DataSetChannel.Instance) outputs[0]; flinkExecutor.fee.setParallelism(flinkExecutor.getNumDefaultPartitions()); @@ -109,7 +111,7 @@ public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements Fl @Override protected ExecutionOperator createCopy() { - return new FlinkObjectFileSource<Type>(sourcePath, this.getType()); + return new FlinkObjectFileSource<Type>(this.getInputUrl(), this.getType()); } @Override diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java index b2809d6..fef4791 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java @@ -32,6 +32,7 @@ public class Mappings { public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList( new TextFileSourceMapping(), new TextFileSinkMapping(), + new ObjectFileSourceMapping(), new ObjectFileSinkMapping(), new MapMapping(), new MapPartitionsMapping(), diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java new file mode 100644 index 0000000..dbc101e --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.mapping; + +import java.util.Collection; +import java.util.Collections; +import org.apache.wayang.basic.operators.ObjectFileSource; +import org.apache.wayang.basic.operators.TextFileSource; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.java.operators.JavaObjectFileSource; +import org.apache.wayang.java.operators.JavaTextFileSource; +import org.apache.wayang.java.platform.JavaPlatform; + +/** + * Mapping from {@link ObjectFileSource} to {@link JavaObjectFileSource}. + */ +public class ObjectFileSourceMapping implements Mapping { + + @Override + public Collection<PlanTransformation> getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + JavaPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern( + "source", + new ObjectFileSource( + null, + DataSetType.none().getDataUnitType().getTypeClass() + ), + false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>( + (matchedOperator, epoch) -> new JavaObjectFileSource(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java index 4973ffd..4da3b00 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.operators.ObjectFileSource; import org.apache.wayang.core.api.exception.WayangException; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; @@ -60,17 +61,17 @@ import java.util.stream.StreamSupport; * * @see JavaObjectFileSink */ -public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecutionOperator { +public class JavaObjectFileSource<T> extends ObjectFileSource<T> implements JavaExecutionOperator { - private final String sourcePath; + public JavaObjectFileSource(ObjectFileSource<T> that) { + super(that); + } public JavaObjectFileSource(DataSetType<T> type) { - this(null, type); + super(null, type); } - public JavaObjectFileSource(String sourcePath, DataSetType<T> type) { - super(type); - this.sourcePath = sourcePath; + super(sourcePath, type); } @Override @@ -83,12 +84,12 @@ public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecu SequenceFileIterator sequenceFileIterator; final String path; - if (this.sourcePath == null) { + if (this.getInputUrl() == null) { final FileChannel.Instance input = (FileChannel.Instance) inputs[0]; path = input.getSinglePath(); } else { assert inputs.length == 0; - path = this.sourcePath; + path = this.getInputUrl(); } try { final String actualInputPath = FileSystems.findActualSingleInputPath(path); @@ -110,7 +111,7 @@ public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecu @Override protected ExecutionOperator createCopy() { - return new JavaObjectFileSource<>(this.sourcePath, this.getType()); + return new JavaObjectFileSource<>(this.getInputUrl(), this.getType()); } @Override diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java index 5618e78..046fb28 100644 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -33,6 +33,7 @@ public class Mappings { new TextFileSourceMapping(), new TextFileSinkMapping(), new ObjectFileSinkMapping(), + new ObjectFileSourceMapping(), new MapMapping(), new MapPartitionsMapping(), new ReduceByMapping(), diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java new file mode 100644 index 0000000..888b1b4 --- /dev/null +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.mapping; + +import java.util.Collection; +import java.util.Collections; +import org.apache.wayang.basic.operators.CollectionSource; +import org.apache.wayang.basic.operators.ObjectFileSource; +import org.apache.wayang.basic.operators.TextFileSource; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.spark.operators.SparkCollectionSource; +import org.apache.wayang.spark.operators.SparkObjectFileSource; +import org.apache.wayang.spark.operators.SparkTextFileSource; +import org.apache.wayang.spark.platform.SparkPlatform; + +/** + * Mapping from {@link ObjectFileSource} to {@link SparkObjectFileSource}. + */ +public class ObjectFileSourceMapping implements Mapping { + + @Override + public Collection<PlanTransformation> getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern( + "source", + new ObjectFileSource( + null, + DataSetType.none().getDataUnitType().getTypeClass() + ), + false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>( + (matchedOperator, epoch) -> new SparkObjectFileSource(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java index 55820f3..a084bcd 100644 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java @@ -20,6 +20,7 @@ package org.apache.wayang.spark.operators; import org.apache.spark.api.java.JavaRDD; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.operators.ObjectFileSource; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; import org.apache.wayang.core.plan.wayangplan.Operator; @@ -45,19 +46,20 @@ import java.util.List; * * @see SparkObjectFileSink */ -public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExecutionOperator { +public class SparkObjectFileSource<T> extends ObjectFileSource<T> implements SparkExecutionOperator { private final Logger logger = LogManager.getLogger(this.getClass()); - private final String sourcePath; + public SparkObjectFileSource(ObjectFileSource that) { + super(that); + } public SparkObjectFileSource(DataSetType<T> type) { this(null, type); } public SparkObjectFileSource(String sourcePath, DataSetType<T> type) { - super(type); - this.sourcePath = sourcePath; + super(sourcePath, type); } @Override @@ -67,9 +69,9 @@ public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExe SparkExecutor sparkExecutor, OptimizationContext.OperatorContext operatorContext) { final String sourcePath; - if (this.sourcePath != null) { + if (this.getInputUrl() != null) { assert inputs.length == 0; - sourcePath = this.sourcePath; + sourcePath = this.getInputUrl(); } else { FileChannel.Instance input = (FileChannel.Instance) inputs[0]; sourcePath = input.getSinglePath(); @@ -86,7 +88,7 @@ public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExe @Override protected ExecutionOperator createCopy() { - return new SparkObjectFileSource<>(this.sourcePath, this.getType()); + return new SparkObjectFileSource<>(this.getInputUrl(), this.getType()); } @Override
