http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala deleted file mode 100644 index 177a9ee..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ /dev/null @@ -1,573 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, - SingleOutputStreamOperator, GroupedDataStream} -import scala.reflect.ClassTag -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.streaming.api.invokable.operator.MapInvokable -import org.apache.flink.util.Collector -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.streaming.api.invokable.StreamInvokable -import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable } -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.common.functions.FilterFunction -import org.apache.flink.streaming.api.function.sink.SinkFunction -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.windowing.helper.WindowingHelper -import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy } -import org.apache.flink.streaming.api.collector.OutputSelector -import scala.collection.JavaConversions._ -import java.util.HashMap -import org.apache.flink.streaming.api.function.aggregation.SumFunction -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator -import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy - -class DataStream[T](javaStream: JavaStream[T]) { - - /** - * Gets the underlying java DataStream object. - */ - def getJavaStream: JavaStream[T] = javaStream - - /** - * Sets the degree of parallelism of this operation. This must be greater than 1. - */ - def setParallelism(dop: Int): DataStream[T] = { - javaStream match { - case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) - case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " + - "have " + - "parallelism.") - } - this - } - - /** - * Returns the degree of parallelism of this operation. - */ - def getParallelism: Int = javaStream match { - case op: SingleOutputStreamOperator[_, _] => op.getParallelism - case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" + - " " + - "parallelism.") - } - - def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = { - javaStream match { - case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy) - case _ => - throw new UnsupportedOperationException("Only supported for operators.") - } - this - } - - /** - * Creates a new DataStream by merging DataStream outputs of - * the same type with each other. The DataStreams merged using this operator - * will be transformed simultaneously. - * - */ - def merge(dataStreams: DataStream[T]*): DataStream[T] = - javaStream.merge(dataStreams.map(_.getJavaStream): _*) - - /** - * Creates a new ConnectedDataStream by connecting - * DataStream outputs of different type with each other. The - * DataStreams connected using this operators can be used with CoFunctions. - * - */ - def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = - javaStream.connect(dataStream.getJavaStream) - - /** - * Groups the elements of a DataStream by the given key positions (for tuple/array types) to - * be used with grouped operators like grouped reduce or grouped aggregations - * - */ - def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*) - - /** - * Groups the elements of a DataStream by the given field expressions to - * be used with grouped operators like grouped reduce or grouped aggregations - * - */ - def groupBy(firstField: String, otherFields: String*): DataStream[T] = - javaStream.groupBy(firstField +: otherFields.toArray: _*) - - /** - * Groups the elements of a DataStream by the given K key to - * be used with grouped operators like grouped reduce or grouped aggregations - * - */ - def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = { - - val keyExtractor = new KeySelector[T, K] { - val cleanFun = clean(fun) - def getKey(in: T) = cleanFun(in) - } - javaStream.groupBy(keyExtractor) - } - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are broadcasted to every parallel instance of the next component. This - * setting only effects the how the outputs will be distributed between the - * parallel instances of the next processing operator. - * - */ - def broadcast: DataStream[T] = javaStream.broadcast() - - /** - * Sets the partitioning of the DataStream so that the output values all go to - * the first instance of the next processing operator. Use this setting with care - * since it might cause a serious performance bottleneck in the application. - */ - def global: DataStream[T] = javaStream.global() - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are shuffled to the next component. This setting only effects the how the - * outputs will be distributed between the parallel instances of the next - * processing operator. - * - */ - def shuffle: DataStream[T] = javaStream.shuffle() - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are forwarded to the local subtask of the next component (whenever - * possible). This is the default partitioner setting. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. - * - */ - def forward: DataStream[T] = javaStream.forward() - - /** - * Sets the partitioning of the DataStream so that the output tuples - * are distributed evenly to the next component.This setting only effects - * the how the outputs will be distributed between the parallel instances of - * the next processing operator. - * - */ - def distribute: DataStream[T] = javaStream.distribute() - - /** - * Initiates an iterative part of the program that creates a loop by feeding - * back data streams. To create a streaming iteration the user needs to define - * a transformation that creates two DataStreams.The first one one is the output - * that will be fed back to the start of the iteration and the second is the output - * stream of the iterative part. - * <p> - * stepfunction: initialStream => (feedback, output) - * <p> - * A common pattern is to use output splitting to create feedback and output DataStream. - * Please refer to the .split(...) method of the DataStream - * <p> - * By default a DataStream with iteration will never terminate, but the user - * can use the maxWaitTime parameter to set a max waiting time for the iteration head. - * If no data received in the set time the stream terminates. - * - * - */ - def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), - maxWaitTimeMillis:Long = 0): DataStream[R] = { - val iterativeStream = javaStream.iterate(maxWaitTimeMillis) - - val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) - iterativeStream.closeWith(feedback.getJavaStream) - output - } - - /** - * Applies an aggregation that that gives the current maximum of the data stream at - * the given position. - * - */ - def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) - - /** - * Applies an aggregation that that gives the current maximum of the data stream at - * the given field. - * - */ - def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) - - /** - * Applies an aggregation that that gives the current minimum of the data stream at - * the given position. - * - */ - def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) - - /** - * Applies an aggregation that that gives the current minimum of the data stream at - * the given field. - * - */ - def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) - - /** - * Applies an aggregation that sums the data stream at the given position. - * - */ - def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) - - /** - * Applies an aggregation that sums the data stream at the given field. - * - */ - def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) - - /** - * Applies an aggregation that that gives the current minimum element of the data stream by - * the given position. When equality, the first element is returned with the minimal value. - * - */ - def minBy(position: Int): DataStream[T] = aggregate(AggregationType - .MINBY, position) - - /** - * Applies an aggregation that that gives the current minimum element of the data stream by - * the given field. When equality, the first element is returned with the minimal value. - * - */ - def minBy(field: String): DataStream[T] = aggregate(AggregationType - .MINBY, field ) - - /** - * Applies an aggregation that that gives the current maximum element of the data stream by - * the given position. When equality, the first element is returned with the maximal value. - * - */ - def maxBy(position: Int): DataStream[T] = - aggregate(AggregationType.MAXBY, position) - - /** - * Applies an aggregation that that gives the current maximum element of the data stream by - * the given field. When equality, the first element is returned with the maximal value. - * - */ - def maxBy(field: String): DataStream[T] = - aggregate(AggregationType.MAXBY, field) - - private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { - val position = fieldNames2Indices(javaStream.getType(), Array(field))(0) - aggregate(aggregationType, position) - } - - private def aggregate(aggregationType: AggregationType, position: Int): - DataStream[T] = { - - val jStream = javaStream.asInstanceOf[JavaStream[Product]] - val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] - - val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) - - val reducer = aggregationType match { - case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). - getTypeClass())) - case _ => new agg.ProductComparableAggregator(aggregationType, true) - } - - val invokable = jStream match { - case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, - groupedStream.getKeySelector()) - case _ => new StreamReduceInvokable(reducer) - } - new DataStream[Product](jStream.transform("aggregation", jStream.getType(), - invokable)).asInstanceOf[DataStream[T]] - } - - /** - * Creates a new DataStream containing the current number (count) of - * received records. - * - */ - def count: DataStream[Long] = new DataStream[java.lang.Long]( - javaStream.count()).asInstanceOf[DataStream[Long]] - - /** - * Creates a new DataStream by applying the given function to every element of this DataStream. - */ - def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = { - if (fun == null) { - throw new NullPointerException("Map function must not be null.") - } - val mapper = new MapFunction[T, R] { - val cleanFun = clean(fun) - def map(in: T): R = cleanFun(in) - } - - javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) - } - - /** - * Creates a new DataStream by applying the given function to every element of this DataStream. - */ - def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = { - if (mapper == null) { - throw new NullPointerException("Map function must not be null.") - } - - javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) - } - - /** - * Creates a new DataStream by applying the given function to every element and flattening - * the results. - */ - def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = { - if (flatMapper == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - javaStream.transform("flatMap", implicitly[TypeInformation[R]], - new FlatMapInvokable[T, R](flatMapper)) - } - - /** - * Creates a new DataStream by applying the given function to every element and flattening - * the results. - */ - def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = { - if (fun == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - val flatMapper = new FlatMapFunction[T, R] { - val cleanFun = clean(fun) - def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) } - } - flatMap(flatMapper) - } - - /** - * Creates a new DataStream by applying the given function to every element and flattening - * the results. - */ - def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = { - if (fun == null) { - throw new NullPointerException("FlatMap function must not be null.") - } - val flatMapper = new FlatMapFunction[T, R] { - val cleanFun = clean(fun) - def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect } - } - flatMap(flatMapper) - } - - /** - * Creates a new [[DataStream]] by reducing the elements of this DataStream - * using an associative reduce function. - */ - def reduce(reducer: ReduceFunction[T]): DataStream[T] = { - if (reducer == null) { - throw new NullPointerException("Reduce function must not be null.") - } - javaStream match { - case ds: GroupedDataStream[_] => javaStream.transform("reduce", - javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())) - case _ => javaStream.transform("reduce", javaStream.getType(), - new StreamReduceInvokable[T](reducer)) - } - } - - /** - * Creates a new [[DataStream]] by reducing the elements of this DataStream - * using an associative reduce function. - */ - def reduce(fun: (T, T) => T): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Reduce function must not be null.") - } - val reducer = new ReduceFunction[T] { - val cleanFun = clean(fun) - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } - } - reduce(reducer) - } - - /** - * Creates a new DataStream that contains only the elements satisfying the given filter predicate. - */ - def filter(filter: FilterFunction[T]): DataStream[T] = { - if (filter == null) { - throw new NullPointerException("Filter function must not be null.") - } - javaStream.filter(filter) - } - - /** - * Creates a new DataStream that contains only the elements satisfying the given filter predicate. - */ - def filter(fun: T => Boolean): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Filter function must not be null.") - } - val filter = new FilterFunction[T] { - val cleanFun = clean(fun) - def filter(in: T) = cleanFun(in) - } - this.filter(filter) - } - - /** - * Create a WindowedDataStream that can be used to apply - * transformation like .reduce(...) or aggregations on - * preset chunks(windows) of the data stream. To define the windows one or - * more WindowingHelper-s such as Time, Count and - * Delta can be used.</br></br> When applied to a grouped data - * stream, the windows (evictions) and slide sizes (triggers) will be - * computed on a per group basis. </br></br> For more advanced control over - * the trigger and eviction policies please use to - * window(List(triggers), List(evicters)) - */ - def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = - javaStream.window(windowingHelper: _*) - - /** - * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. - * Windowing can be used to apply transformation like .reduce(...) or aggregations on - * preset chunks(windows) of the data stream.</br></br>For most common - * use-cases please refer to window(WindowingHelper[_]*) - * - */ - def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): - WindowedDataStream[T] = javaStream.window(triggers, evicters) - - /** - * - * Operator used for directing tuples to specific named outputs using an - * OutputSelector. Calling this method on an operator creates a new - * SplitDataStream. - */ - def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector) - - /** - * Creates a new SplitDataStream that contains only the elements satisfying the - * given output selector predicate. - */ - def split(fun: T => String): SplitDataStream[T] = { - if (fun == null) { - throw new NullPointerException("OutputSelector must not be null.") - } - val selector = new OutputSelector[T] { - val cleanFun = clean(fun) - def select(in: T): java.lang.Iterable[String] = { - List(cleanFun(in)) - } - } - split(selector) - } - - /** - * Initiates a temporal Join transformation that joins the elements of two - * data streams on key equality over a specified time window. - * - * This method returns a StreamJoinOperator on which the - * .onWindow(..) should be called to define the - * window, and then the .where(..) and .equalTo(..) methods can be used to defin - * the join keys.</p> The user can also use the apply method of the returned JoinedStream - * to use custom join function. - * - */ - def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = - new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) - - /** - * Initiates a temporal cross transformation that builds all pair - * combinations of elements of both DataStreams, i.e., it builds a Cartesian - * product. - * - * This method returns a StreamJoinOperator on which the - * .onWindow(..) should be called to define the - * window, and then the .where(..) and .equalTo(..) methods can be used to defin - * the join keys.</p> The user can also use the apply method of the returned JoinedStream - * to use custom join function. - * - */ - def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = - new StreamCrossOperator[T, R](javaStream, stream.getJavaStream) - - /** - * Writes a DataStream to the standard output stream (stdout). For each - * element of the DataStream the result of .toString is - * written. - * - */ - def print(): DataStream[T] = javaStream.print() - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For - * every element of the DataStream the result of .toString - * is written. - * - */ - def writeAsText(path: String, millis: Long = 0): DataStream[T] = - javaStream.writeAsText(path, millis) - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For - * every element of the DataStream the result of .toString - * is written. - * - */ - def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = - javaStream.writeAsCsv(path, millis) - - /** - * Adds the given sink to this DataStream. Only streams with sinks added - * will be executed once the StreamExecutionEnvironment.execute(...) - * method is called. - * - */ - def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = - javaStream.addSink(sinkFuntion) - - /** - * Adds the given sink to this DataStream. Only streams with sinks added - * will be executed once the StreamExecutionEnvironment.execute(...) - * method is called. - * - */ - def addSink(fun: T => Unit): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Sink function must not be null.") - } - val sinkFunction = new SinkFunction[T] { - val cleanFun = clean(fun) - def invoke(in: T) = cleanFun(in) - } - this.addSink(sinkFunction) - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala deleted file mode 100644 index 9e33f80..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } - -/** - * The SplitDataStream represents an operator that has been split using an - * {@link OutputSelector}. Named outputs can be selected using the - * {@link #select} function. To apply a transformation on the whole output simply call - * the appropriate method on this stream. - * - * @param <OUT> - * The type of the output. - */ -class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){ - - /** - * Sets the output names for which the next operator will receive values. - */ - def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala deleted file mode 100644 index a408ec0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import scala.reflect.ClassTag -import org.apache.commons.lang.Validate -import org.apache.flink.api.common.functions.CrossFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.scala.typeutils.CaseClassSerializer -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} -import org.apache.flink.streaming.api.function.co.CrossWindowFunction -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow -import java.util.concurrent.TimeUnit - -class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends - TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { - - override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = { - - val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, - (l: I1, r: I2) => (l, r)) - - val returnType = new CaseClassTypeInfo[(I1, I2)]( - - classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(I1, I2)] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) - } - } - } - } - - val javaStream = input1.connect(input2).addGeneralWindowCombine( - crossWindowFunction, - returnType, windowSize, - slideInterval, timeStamp1, timeStamp2) - - new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream) - } -} -object StreamCrossOperator { - - private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], - javaStream: JavaStream[(I1, I2)]) extends - DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] { - - /** - * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf - * call will be emitted. - * - */ - def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { - - val invokable = new CoWindowInvokable[I1, I2, R]( - clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, - op.timeStamp2) - - javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(), - invokable) - - javaStream.setType(implicitly[TypeInformation[R]]) - } - - override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = { - every(timeUnit.toMillis(length)) - } - - override def every(length: Long): CrossWindow[I1, I2] = { - val builder = javaStream.getExecutionEnvironment().getStreamGraph() - val invokable = builder.getInvokable(javaStream.getId()) - invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length) - this - } - } - - private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], - crossFunction: (I1, I2) => R): - CrossWindowFunction[I1, I2, R] = { - Validate.notNull(crossFunction, "Join function must not be null.") - - val crossFun = new CrossFunction[I1, I2, R] { - val cleanFun = op.input1.clean(crossFunction) - - override def cross(first: I1, second: I2): R = { - cleanFun(first, second) - } - } - - new CrossWindowFunction[I1, I2, R](crossFun) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala deleted file mode 100644 index 394673c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import scala.reflect.ClassTag -import org.apache.commons.lang.Validate -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} -import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction } -import org.apache.flink.util.Collector -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType - -class StreamExecutionEnvironment(javaEnv: JavaEnv) { - - /** - * Sets the degree of parallelism (DOP) for operations executed through this environment. - * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with - * x parallel instances. This value can be overridden by specific operations using - * [[DataStream.setParallelism]]. - */ - def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { - javaEnv.setDegreeOfParallelism(degreeOfParallelism) - } - - /** - * Returns the default degree of parallelism for this execution environment. Note that this - * value can be overridden by individual operations using [[DataStream.setParallelism]] - */ - def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism - - /** - * Sets the maximum time frequency (milliseconds) for the flushing of the - * output buffers. By default the output buffers flush frequently to provide - * low latency and to aid smooth developer experience. Setting the parameter - * can result in three logical modes: - * - * <ul> - * <li> - * A positive integer triggers flushing periodically by that integer</li> - * <li> - * 0 triggers flushing after every record thus minimizing latency</li> - * <li> - * -1 triggers flushing only when the output buffer is full thus maximizing - * throughput</li> - * </ul> - * - */ - def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = { - javaEnv.setBufferTimeout(timeoutMillis) - this - } - - /** - * Gets the default buffer timeout set for this environment - */ - def getBufferTimout: Long = javaEnv.getBufferTimeout() - - /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise. The file will be read with the system's default - * character set. - * - */ - def readTextFile(filePath: String): DataStream[String] = - javaEnv.readTextFile(filePath) - - /** - * Creates a DataStream that contains the contents of file created while - * system watches the given path. The file will be read with the system's - * default character set. The user can check the monitoring interval in milliseconds, - * and the way file modifications are handled. By default it checks for only new files - * every 100 milliseconds. - * - */ - def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = - WatchType.ONLY_NEW_FILES): DataStream[String] = - javaEnv.readFileStream(StreamPath, intervalMillis, watchType) - - /** - * Creates a new DataStream that contains the strings received infinitely - * from socket. Received strings are decoded by the system's default - * character set. - * - */ - def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] = - javaEnv.socketTextStream(hostname, port, delimiter) - - /** - * Creates a new DataStream that contains the strings received infinitely - * from socket. Received strings are decoded by the system's default - * character set, uses '\n' as delimiter. - * - */ - def socketTextStream(hostname: String, port: Int): DataStream[String] = - javaEnv.socketTextStream(hostname, port) - - /** - * Creates a new DataStream that contains a sequence of numbers. - * - */ - def generateSequence(from: Long, to: Long): DataStream[Long] = { - new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)). - asInstanceOf[DataStream[Long]] - } - - /** - * Creates a DataStream that contains the given elements. The elements must all be of the - * same type and must be serializable. - * - * * Note that this operation will result in a non-parallel data source, i.e. a data source with - * a degree of parallelism of one. - */ - def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = { - val typeInfo = implicitly[TypeInformation[T]] - fromCollection(data)(implicitly[ClassTag[T]], typeInfo) - } - - /** - * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable - * because the framework may move the elements into the cluster if needed. - * - * Note that this operation will result in a non-parallel data source, i.e. a data source with - * a degree of parallelism of one. - */ - def fromCollection[T: ClassTag: TypeInformation]( - data: Seq[T]): DataStream[T] = { - Validate.notNull(data, "Data must not be null.") - val typeInfo = implicitly[TypeInformation[T]] - - val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions - .asJavaCollection(data)) - - javaEnv.addSource(sourceFunction, typeInfo) - } - - /** - * Create a DataStream using a user defined source function for arbitrary - * source functionality. By default sources have a parallelism of 1. - * To enable parallel execution, the user defined source should implement - * ParallelSourceFunction or extend RichParallelSourceFunction. - * In these cases the resulting source will have the parallelism of the environment. - * To change this afterwards call DataStreamSource.setParallelism(int) - * - */ - def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { - Validate.notNull(function, "Function must not be null.") - val cleanFun = StreamExecutionEnvironment.clean(function) - val typeInfo = implicitly[TypeInformation[T]] - javaEnv.addSource(cleanFun, typeInfo) - } - - /** - * Create a DataStream using a user defined source function for arbitrary - * source functionality. - * - */ - def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { - Validate.notNull(function, "Function must not be null.") - val sourceFunction = new SourceFunction[T] { - val cleanFun = StreamExecutionEnvironment.clean(function) - override def invoke(out: Collector[T]) { - cleanFun(out) - } - } - addSource(sourceFunction) - } - - /** - * Triggers the program execution. The environment will execute all parts of - * the program that have resulted in a "sink" operation. Sink operations are - * for example printing results or forwarding them to a message queue. - * <p> - * The program execution will be logged and displayed with a generated - * default name. - * - */ - def execute() = javaEnv.execute() - - /** - * Triggers the program execution. The environment will execute all parts of - * the program that have resulted in a "sink" operation. Sink operations are - * for example printing results or forwarding them to a message queue. - * <p> - * The program execution will be logged and displayed with the provided name - * - */ - def execute(jobName: String) = javaEnv.execute(jobName) - - /** - * Creates the plan with which the system will execute the program, and - * returns it as a String using a JSON representation of the execution data - * flow graph. Note that this needs to be called, before the plan is - * executed. - * - */ - def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON - -} - -object StreamExecutionEnvironment { - - private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) - f - } - - /** - * Creates an execution environment that represents the context in which the program is - * currently executed. If the program is invoked standalone, this method returns a local - * execution environment. If the program is invoked from within the command line client - * to be submitted to a cluster, this method returns the execution environment of this cluster. - */ - def getExecutionEnvironment: StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment) - } - - /** - * Creates a local execution environment. The local execution environment will run the program in - * a multi-threaded fashion in the same JVM as the environment was created in. The default degree - * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). - */ - def createLocalEnvironment( - degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): - StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism)) - } - - /** - * Creates a remote execution environment. The remote environment sends (parts of) the program to - * a cluster for execution. Note that all file paths used in the program must be accessible from - * the cluster. The execution will use the cluster's default degree of parallelism, unless the - * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]]. - * - * @param host The host name or address of the master (JobManager), - * where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. - * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the - * program uses - * user-defined functions, user-defined input formats, or any libraries, - * those must be - * provided in the JAR files. - */ - def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): - StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)) - } - - /** - * Creates a remote execution environment. The remote environment sends (parts of) the program - * to a cluster for execution. Note that all file paths used in the program must be accessible - * from the cluster. The execution will use the specified degree of parallelism. - * - * @param host The host name or address of the master (JobManager), - * where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. - * @param degreeOfParallelism The degree of parallelism to use during the execution. - * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the - * program uses - * user-defined functions, user-defined input formats, or any libraries, - * those must be - * provided in the JAR files. - */ - def createRemoteEnvironment( - host: String, - port: Int, - degreeOfParallelism: Int, - jarFiles: String*): StreamExecutionEnvironment = { - val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*) - javaEnv.setDegreeOfParallelism(degreeOfParallelism) - new StreamExecutionEnvironment(javaEnv) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala deleted file mode 100644 index 1bd1bfb..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import scala.Array.canBuildFrom -import scala.reflect.ClassTag -import org.apache.commons.lang.Validate -import org.apache.flink.api.common.functions.JoinFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.operators.Keys -import org.apache.flink.api.scala.typeutils.CaseClassSerializer -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.function.co.JoinWindowFunction -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.util.keys.KeySelectorUtil -import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow -import java.util.concurrent.TimeUnit - -class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends -TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { - - override def createNextWindowOperator() = { - new StreamJoinOperator.JoinWindow[I1, I2](this) - } -} - -object StreamJoinOperator { - - class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends - TemporalWindow[JoinWindow[I1, I2]] { - - private[flink] val type1 = op.input1.getType() - - /** - * Continues a temporal Join transformation by defining - * the fields in the first stream to be used as keys for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where(fields: Int*) = { - new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray, type1), type1)) - } - - /** - * Continues a temporal Join transformation by defining - * the fields in the first stream to be used as keys for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where(firstField: String, otherFields: String*) = - new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), type1)) - - /** - * Continues a temporal Join transformation by defining - * the keyselector function that will be used to extract keys from the first stream - * for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where[K: TypeInformation](fun: (I1) => K) = { - val keyType = implicitly[TypeInformation[K]] - val keyExtractor = new KeySelector[I1, K] { - val cleanFun = op.input1.clean(fun) - def getKey(in: I1) = cleanFun(in) - } - new JoinPredicate[I1, I2](op, keyExtractor) - } - - override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = { - every(timeUnit.toMillis(length)) - } - - override def every(length: Long): JoinWindow[I1, I2] = { - op.slideInterval = length - this - } - - } - - class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], - private[flink] val keys1: KeySelector[I1, _]) { - private[flink] var keys2: KeySelector[I2, _] = null - private[flink] val type2 = op.input2.getType() - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo(fields: Int*): JoinedStream[I1, I2] = { - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray, type2), type2)) - } - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), type2)) - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = { - val keyType = implicitly[TypeInformation[K]] - val keyExtractor = new KeySelector[I2, K] { - val cleanFun = op.input1.clean(fun) - def getKey(in: I2) = cleanFun(in) - } - finish(keyExtractor) - } - - private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = { - this.keys2 = keys2 - new JoinedStream[I1, I2](this, createJoinOperator()) - } - - private def createJoinOperator(): JavaStream[(I1, I2)] = { - - val returnType = new CaseClassTypeInfo[(I1, I2)]( - - classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(I1, I2)] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) - } - } - } - } - - return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) - .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), - returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) - } - } - - class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends - DataStream[(I1, I2)](javaStream) { - - private val op = jp.op - - /** - * Sets a wrapper for the joined elements. For each joined pair, the result of the - * udf call will be emitted. - */ - def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { - - val invokable = new CoWindowInvokable[I1, I2, R]( - clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, - op.timeStamp2) - - javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(), - invokable) - - javaStream.setType(implicitly[TypeInformation[R]]) - } - } - - private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], - joinFunction: (I1, I2) => R) = { - Validate.notNull(joinFunction, "Join function must not be null.") - - val joinFun = new JoinFunction[I1, I2, R] { - - val cleanFun = jp.op.input1.clean(joinFunction) - - override def join(first: I1, second: I2): R = { - cleanFun(first, second) - } - } - - new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala deleted file mode 100644 index fd3a4a9..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import org.apache.flink.streaming.api.datastream.temporaloperator.{ TemporalOperator => JTempOp } -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow -import org.apache.flink.streaming.api.windowing.helper.Timestamp -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._ - -abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]]( - i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) { - - def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = { - val timeStamp1 = getTS(ts1) - val timeStamp2 = getTS(ts2) - onWindow(length, timeStamp1, timeStamp2, startTime) - } - - def getTS[R](ts: R => Long): Timestamp[R] = { - new Timestamp[R] { - val cleanFun = clean(ts, true) - def getTimestamp(in: R) = cleanFun(in) - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala deleted file mode 100644 index 5c734bf..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import scala.Array.canBuildFrom -import scala.collection.JavaConversions.iterableAsScalaIterable -import scala.reflect.ClassTag - -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator -import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream} -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType -import org.apache.flink.streaming.api.function.aggregation.SumFunction -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.windowing.helper.WindowingHelper -import org.apache.flink.streaming.api.windowing.helper._ -import org.apache.flink.util.Collector - -class WindowedDataStream[T](javaStream: JavaWStream[T]) { - - /** - * Defines the slide size (trigger frequency) for the windowed data stream. - * This controls how often the user defined function will be triggered on - * the window. - */ - def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = - javaStream.every(windowingHelper: _*) - - /** - * Groups the elements of the WindowedDataStream using the given - * field positions. The window sizes (evictions) and slide sizes - * (triggers) will be calculated on the whole stream (in a central fashion), - * but the user defined functions will be applied on a per group basis. - * </br></br> To get windows and triggers on a per group basis apply the - * DataStream.window(...) operator on an already grouped data stream. - * - */ - def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*) - - /** - * Groups the elements of the WindowedDataStream using the given - * field expressions. The window sizes (evictions) and slide sizes - * (triggers) will be calculated on the whole stream (in a central fashion), - * but the user defined functions will be applied on a per group basis. - * </br></br> To get windows and triggers on a per group basis apply the - * DataStream.window(...) operator on an already grouped data stream. - * - */ - def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] = - javaStream.groupBy(firstField +: otherFields.toArray: _*) - - /** - * Groups the elements of the WindowedDataStream using the given - * KeySelector function. The window sizes (evictions) and slide sizes - * (triggers) will be calculated on the whole stream (in a central fashion), - * but the user defined functions will be applied on a per group basis. - * </br></br> To get windows and triggers on a per group basis apply the - * DataStream.window(...) operator on an already grouped data stream. - * - */ - def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = { - - val keyExtractor = new KeySelector[T, K] { - val cleanFun = clean(fun) - def getKey(in: T) = cleanFun(in) - } - javaStream.groupBy(keyExtractor) - } - - /** - * Applies a reduce transformation on the windowed data stream by reducing - * the current window at every trigger. - * - */ - def reduce(reducer: ReduceFunction[T]): DataStream[T] = { - if (reducer == null) { - throw new NullPointerException("Reduce function must not be null.") - } - javaStream.reduce(reducer) - } - - /** - * Applies a reduce transformation on the windowed data stream by reducing - * the current window at every trigger. - * - */ - def reduce(fun: (T, T) => T): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Reduce function must not be null.") - } - val reducer = new ReduceFunction[T] { - val cleanFun = clean(fun) - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } - } - reduce(reducer) - } - - /** - * Applies a reduceGroup transformation on the windowed data stream by reducing - * the current window at every trigger. In contrast with the simple binary reduce operator, - * groupReduce exposes the whole window through the Iterable interface. - * </br> - * </br> - * Whenever possible try to use reduce instead of groupReduce for increased efficiency - */ - def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): - DataStream[R] = { - if (reducer == null) { - throw new NullPointerException("GroupReduce function must not be null.") - } - javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]]) - } - - /** - * Applies a reduceGroup transformation on the windowed data stream by reducing - * the current window at every trigger. In contrast with the simple binary reduce operator, - * groupReduce exposes the whole window through the Iterable interface. - * </br> - * </br> - * Whenever possible try to use reduce instead of groupReduce for increased efficiency - */ - def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): - DataStream[R] = { - if (fun == null) { - throw new NullPointerException("GroupReduce function must not be null.") - } - val reducer = new GroupReduceFunction[T, R] { - val cleanFun = clean(fun) - def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) } - } - reduceGroup(reducer) - } - - /** - * Applies an aggregation that that gives the maximum of the elements in the window at - * the given position. - * - */ - def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) - - /** - * Applies an aggregation that that gives the maximum of the elements in the window at - * the given field. - * - */ - def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) - - /** - * Applies an aggregation that that gives the minimum of the elements in the window at - * the given position. - * - */ - def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) - - /** - * Applies an aggregation that that gives the minimum of the elements in the window at - * the given field. - * - */ - def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) - - /** - * Applies an aggregation that sums the elements in the window at the given position. - * - */ - def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) - - /** - * Applies an aggregation that sums the elements in the window at the given field. - * - */ - def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) - - /** - * Applies an aggregation that that gives the maximum element of the window by - * the given position. When equality, returns the first. - * - */ - def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY, - position) - - /** - * Applies an aggregation that that gives the maximum element of the window by - * the given field. When equality, returns the first. - * - */ - def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY, - field) - - /** - * Applies an aggregation that that gives the minimum element of the window by - * the given position. When equality, returns the first. - * - */ - def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY, - position) - - /** - * Applies an aggregation that that gives the minimum element of the window by - * the given field. When equality, returns the first. - * - */ - def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, - field) - - private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { - val position = fieldNames2Indices(javaStream.getType(), Array(field))(0) - aggregate(aggregationType, position) - } - - def aggregate(aggregationType: AggregationType, position: Int): - DataStream[T] = { - - val jStream = javaStream.asInstanceOf[JavaWStream[Product]] - val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] - - val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) - - val reducer = aggregationType match { - case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( - outType.getTypeAt(position).getTypeClass())) - case _ => new agg.ProductComparableAggregator(aggregationType, true) - } - - new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala deleted file mode 100644 index 222eb6d..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api - -import _root_.scala.reflect.ClassTag -import language.experimental.macros -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } -import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } - -package object scala { - // We have this here so that we always have generated TypeInformationS when - // using the Scala API - implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] - - implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = - new DataStream[R](javaStream) - - implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = - new WindowedDataStream[R](javaWStream) - - implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = - new SplitDataStream[R](javaStream) - - implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): - ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) - - implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] = - StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq) - - - private[flink] def fieldNames2Indices( - typeInfo: TypeInformation[_], - fields: Array[String]): Array[Int] = { - typeInfo match { - case ti: CaseClassTypeInfo[_] => - val result = ti.getFieldIndices(fields) - - if (result.contains(-1)) { - throw new IllegalArgumentException("Fields '" + fields.mkString(", ") + - "' are not valid for '" + ti.toString + "'.") - } - - result - - case _ => - throw new UnsupportedOperationException("Specifying fields by name is only" + - "supported on Case Classes (for now).") - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala deleted file mode 100644 index eedee0e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala.windowing - -import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } -import org.apache.commons.lang.Validate -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction - -object Delta { - - /** - * Creates a delta helper representing a delta trigger or eviction policy. - * </br></br> This policy calculates a delta between the data point which - * triggered last and the currently arrived data point. It triggers if the - * delta is higher than a specified threshold. </br></br> In case it gets - * used for eviction, this policy starts from the first element of the - * buffer and removes all elements from the buffer which have a higher delta - * then the threshold. As soon as there is an element with a lower delta, - * the eviction stops. - */ - def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = { - Validate.notNull(deltaFunction, "Delta function must not be null") - val df = new DeltaFunction[T] { - val cleanFun = clean(deltaFunction) - override def getDelta(first: T, second: T) = cleanFun(first, second) - } - JavaDelta.of(threshold, df, initVal) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala deleted file mode 100644 index 9a69369..0000000 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala.windowing - -import java.util.concurrent.TimeUnit -import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } - -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.windowing.helper.Timestamp -import org.apache.commons.lang.Validate - -object Time { - - /** - * Creates a helper representing a time trigger which triggers every given - * length (slide size) or a time eviction which evicts all elements older - * than length (window size) using System time. - * - */ - def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] = - JavaTime.of(windowSize, timeUnit) - - /** - * Creates a helper representing a time trigger which triggers every given - * length (slide size) or a time eviction which evicts all elements older - * than length (window size) using a user defined timestamp extractor. - * - */ - def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = { - Validate.notNull(timestamp, "Timestamp must not be null.") - val ts = new Timestamp[R] { - val fun = clean(timestamp, true) - override def getTimestamp(in: R) = fun(in) - } - JavaTime.of(windowSize, ts, startTime) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml deleted file mode 100644 index 386ef73..0000000 --- a/flink-addons/flink-streaming/pom.xml +++ /dev/null @@ -1,75 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-addons</artifactId> - <version>0.9-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-streaming-parent</artifactId> - <name>flink-streaming</name> - <packaging>pom</packaging> - - <modules> - <module>flink-streaming-core</module> - <module>flink-streaming-scala</module> - <module>flink-streaming-examples</module> - <module>flink-streaming-connectors</module> - </modules> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-compiler</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-tachyon/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-tachyon/pom.xml b/flink-addons/flink-tachyon/pom.xml deleted file mode 100644 index de36546..0000000 --- a/flink-addons/flink-tachyon/pom.xml +++ /dev/null @@ -1,117 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-addons</artifactId> - <version>0.9-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-tachyon</artifactId> - <name>flink-tachyon</name> - - <packaging>jar</packaging> - - <!-- - This is a Hadoop2 only flink module. - --> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.tachyonproject</groupId> - <artifactId>tachyon</artifactId> - <version>0.5.0</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.tachyonproject</groupId> - <artifactId>tachyon</artifactId> - <version>0.5.0</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-util</artifactId> - <version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$--> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - <type>test-jar</type> - <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <type>test-jar</type> - <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> - </dependency> - </dependencies> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - <version>7.6.8.v20121106</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - <version>7.6.8.v20121106</version> - <scope>test</scope> - </dependency> - </dependencies> - </dependencyManagement> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java deleted file mode 100644 index 7318894..0000000 --- a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tachyon; - - -import org.apache.commons.io.IOUtils; -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.examples.java.wordcount.WordCount; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.io.StringWriter; - -/** - * This test should logically be located in the 'flink-runtime' tests. However, this project - * has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here. - */ -public class HDFSTest { - - private String hdfsURI; - private MiniDFSCluster hdfsCluster; - private org.apache.hadoop.fs.Path hdPath; - private org.apache.hadoop.fs.FileSystem hdfs; - - @Before - public void createHDFS() { - try { - Configuration hdConf = new Configuration(); - - File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile(); - FileUtil.fullyDelete(baseDir); - hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); - hdfsCluster = builder.build(); - - hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; - - hdPath = new org.apache.hadoop.fs.Path("/test"); - hdfs = hdPath.getFileSystem(hdConf); - FSDataOutputStream stream = hdfs.create(hdPath); - for(int i = 0; i < 10; i++) { - stream.write("Hello HDFS\n".getBytes()); - } - stream.close(); - - } catch(Throwable e) { - e.printStackTrace(); - Assert.fail("Test failed " + e.getMessage()); - } - } - - @After - public void destroyHDFS() { - try { - hdfs.delete(hdPath, false); - hdfsCluster.shutdown(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - - @Test - public void testHDFS() { - - Path file = new Path(hdfsURI + hdPath); - org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result"); - try { - FileSystem fs = file.getFileSystem(); - Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem); - new TachyonFileSystemWrapperTest.DopOneTestEnvironment(); - try { - WordCount.main(new String[]{file.toString(), result.toString()}); - } catch(Throwable t) { - t.printStackTrace(); - Assert.fail("Test failed with " + t.getMessage()); - } - Assert.assertTrue("No result file present", hdfs.exists(result)); - // validate output: - org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result); - StringWriter writer = new StringWriter(); - IOUtils.copy(inStream, writer); - String resultString = writer.toString(); - - Assert.assertEquals("hdfs 10\n" + - "hello 10\n", resultString); - inStream.close(); - - } catch (IOException e) { - e.printStackTrace(); - Assert.fail("Error in test: " + e.getMessage() ); - } - } - - @Test - public void testAvroOut() { - String type = "one"; - AvroOutputFormat<String> avroOut = - new AvroOutputFormat<String>( String.class ); - - org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest"); - - avroOut.setOutputFilePath(new Path(result.toString())); - avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE); - avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS); - - try { - avroOut.open(0, 2); - avroOut.writeRecord(type); - avroOut.close(); - - avroOut.open(1, 2); - avroOut.writeRecord(type); - avroOut.close(); - - - Assert.assertTrue("No result file present", hdfs.exists(result)); - FileStatus[] files = hdfs.listStatus(result); - Assert.assertEquals(2, files.length); - for(FileStatus file : files) { - Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); - } - - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } -}