zentol closed pull request #6042: [FLINK-9400] normalize import statement style for flink-scala URL: https://github.com/apache/flink/pull/6042
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala index aa6b47b9649..4510e52e6fd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala @@ -23,9 +23,9 @@ import org.apache.flink.annotation.{Internal, Public} import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.functions.{CoGroupFunction, Partitioner, RichCoGroupFunction} import org.apache.flink.api.common.operators.{Keys, Order} +import org.apache.flink.api.common.operators.Keys.ExpressionKeys import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType -import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.util.Collector diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 71037c3ae1d..6ba6455c17f 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.aggregators.Aggregator import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat} -import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec} +import org.apache.flink.api.common.operators.{Order, ResourceSpec} +import org.apache.flink.api.common.operators.Keys.{ExpressionKeys, SelectorFunctionKeys} import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod @@ -32,7 +33,6 @@ import org.apache.flink.api.java.Utils.CountHelper import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat} -import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.java.typeutils.TupleTypeInfoBase @@ -837,7 +837,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } wrap(new DistinctOperator[T]( javaSet, - new Keys.SelectorFunctionKeys[T, K]( + new SelectorFunctionKeys[T, K]( keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]), getCallLocationName())) } @@ -865,7 +865,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def distinct(fields: Int*): DataSet[T] = { wrap(new DistinctOperator[T]( javaSet, - new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType), + new ExpressionKeys[T](fields.toArray, javaSet.getType), getCallLocationName())) } @@ -888,7 +888,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def distinct(firstField: String, otherFields: String*): DataSet[T] = { wrap(new DistinctOperator[T]( javaSet, - new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType), + new ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType), getCallLocationName())) } @@ -911,7 +911,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def getKey(in: T) = cleanFun(in) } new GroupedDataSet[T](this, - new Keys.SelectorFunctionKeys[T, K](keyExtractor, javaSet.getType, keyType)) + new SelectorFunctionKeys[T, K](keyExtractor, javaSet.getType, keyType)) } /** @@ -926,7 +926,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def groupBy(fields: Int*): GroupedDataSet[T] = { new GroupedDataSet[T]( this, - new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType)) + new ExpressionKeys[T](fields.toArray, javaSet.getType)) } /** @@ -940,11 +940,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T] = { new GroupedDataSet[T]( this, - new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType)) + new ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType)) } // public UnsortedGrouping<T> groupBy(String... fields) { - // new UnsortedGrouping<T>(this, new Keys.ExpressionKeys<T>(fields, getType())); + // new UnsortedGrouping<T>(this, new ExpressionKeys<T>(fields, getType())); // } // -------------------------------------------------------------------------------------------- @@ -1420,7 +1420,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, PartitionMethod.HASH, - new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType), + new ExpressionKeys[T](fields.toArray, javaSet.getType), getCallLocationName()) wrap(op) } @@ -1435,7 +1435,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, PartitionMethod.HASH, - new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType), + new ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType), getCallLocationName()) wrap(op) } @@ -1454,7 +1454,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, PartitionMethod.HASH, - new Keys.SelectorFunctionKeys[T, K]( + new SelectorFunctionKeys[T, K]( keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]), @@ -1474,7 +1474,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, PartitionMethod.RANGE, - new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType), + new ExpressionKeys[T](fields.toArray, javaSet.getType), getCallLocationName()) wrap(op) } @@ -1490,7 +1490,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, PartitionMethod.RANGE, - new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType), + new ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType), getCallLocationName()) wrap(op) } @@ -1510,7 +1510,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, PartitionMethod.RANGE, - new Keys.SelectorFunctionKeys[T, K]( + new SelectorFunctionKeys[T, K]( keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]), @@ -1528,7 +1528,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataSet[T] = { val op = new PartitionOperator[T]( javaSet, - new Keys.ExpressionKeys[T](Array[Int](field), javaSet.getType), + new ExpressionKeys[T](Array[Int](field), javaSet.getType), partitioner, implicitly[TypeInformation[K]], getCallLocationName()) @@ -1547,7 +1547,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { : DataSet[T] = { val op = new PartitionOperator[T]( javaSet, - new Keys.ExpressionKeys[T](Array[String](field), javaSet.getType), + new ExpressionKeys[T](Array[String](field), javaSet.getType), partitioner, implicitly[TypeInformation[K]], getCallLocationName()) @@ -1574,7 +1574,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val op = new PartitionOperator[T]( javaSet, - new Keys.SelectorFunctionKeys[T, K]( + new SelectorFunctionKeys[T, K]( keyExtractor, javaSet.getType, keyType), @@ -1639,7 +1639,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val keyType = implicitly[TypeInformation[K]] new PartitionSortedDataSet[T]( new SortPartitionOperator[T](javaSet, - new Keys.SelectorFunctionKeys[T, K]( + new SelectorFunctionKeys[T, K]( keyExtractor, javaSet.getType, keyType), diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index 72608b374b0..e693b806414 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -22,10 +22,10 @@ import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction, Partitioner, ReduceFunction} import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint import org.apache.flink.api.common.operators.{Keys, Order} +import org.apache.flink.api.common.operators.Keys.{ExpressionKeys, SelectorFunctionKeys} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} -import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala.operators.ScalaAggregateOperator @@ -54,7 +54,7 @@ class GroupedDataSet[T: ClassTag]( private var partitioner : Partitioner[_] = _ - private var groupSortKeySelector: Option[Keys.SelectorFunctionKeys[T, _]] = None + private var groupSortKeySelector: Option[SelectorFunctionKeys[T, _]] = None /** * Adds a secondary sort key to this [[GroupedDataSet]]. This will only have an effect if you @@ -63,7 +63,7 @@ class GroupedDataSet[T: ClassTag]( * This only works on Tuple DataSets. */ def sortGroup(field: Int, order: Order): GroupedDataSet[T] = { - if (keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) { + if (keys.isInstanceOf[SelectorFunctionKeys[_, _]]) { throw new InvalidProgramException("KeySelector grouping keys and field index group-sorting " + "keys cannot be used together.") } @@ -90,7 +90,7 @@ class GroupedDataSet[T: ClassTag]( throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not" + "supported.") } - if (keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) { + if (keys.isInstanceOf[SelectorFunctionKeys[_, _]]) { throw new InvalidProgramException("KeySelector grouping keys and field expression " + "group-sorting keys cannot be used together.") } @@ -113,7 +113,7 @@ class GroupedDataSet[T: ClassTag]( throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not" + "supported.") } - if (!keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) { + if (!keys.isInstanceOf[SelectorFunctionKeys[_, _]]) { throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector " + "grouping.") } @@ -123,7 +123,7 @@ class GroupedDataSet[T: ClassTag]( val keyExtractor = new KeySelector[T, K] { def getKey(in: T) = fun(in) } - groupSortKeySelector = Some(new Keys.SelectorFunctionKeys[T, K]( + groupSortKeySelector = Some(new SelectorFunctionKeys[T, K]( keyExtractor, set.javaSet.getType, keyType)) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index 70ca412b9c4..7fff3d27b5d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -23,11 +23,10 @@ import java.util.regex.{Matcher, Pattern} import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.operators.Keys +import org.apache.flink.api.common.operators.Keys.ExpressionKeys import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, InvalidFieldReferenceException, TypeComparatorBuilder} import org.apache.flink.api.common.typeutils._ -import Keys.ExpressionKeys import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import scala.collection.JavaConverters._ diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala index 462007567d7..b44be3ee3b8 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala @@ -21,9 +21,9 @@ package org.apache.flink.api.scala import org.apache.flink.annotation.Internal import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.operators.Keys +import org.apache.flink.api.common.operators.Keys.{ExpressionKeys, SelectorFunctionKeys} import org.apache.flink.api.java.functions.KeySelector -import Keys.ExpressionKeys import org.apache.flink.api.common.typeinfo.TypeInformation /** @@ -87,7 +87,7 @@ private[flink] abstract class UnfinishedKeyPairOperation[L, R, O]( val cleanFun = leftInput.clean(fun) def getKey(in: L) = cleanFun(in) } - val leftKey = new Keys.SelectorFunctionKeys[L, K](keyExtractor, leftInput.getType, keyType) + val leftKey = new SelectorFunctionKeys[L, K](keyExtractor, leftInput.getType, keyType) new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey) } } @@ -136,7 +136,7 @@ private[flink] class HalfUnfinishedKeyPairOperation[L, R, O]( val cleanFun = unfinished.leftInput.clean(fun) def getKey(in: R) = cleanFun(in) } - val rightKey = new Keys.SelectorFunctionKeys[R, K]( + val rightKey = new SelectorFunctionKeys[R, K]( keyExtractor, unfinished.rightInput.getType, keyType) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services