[ 
https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286450#comment-15286450
 ] 

ASF GitHub Bot commented on FLINK-2259:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63508672
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, 
BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, 
WithParameters}
    +import org.apache.flink.util.Collector
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: 
DataSet[T],
    +                                                             testing: 
DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : 
ClassTag](training: DataSet[T],
    +                                                                    
testing: DataSet[T],
    +                                                                    
holdout: DataSet[T])
    +  // 
--------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // 
--------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, 
should be [0,1] This fraction
    +   *                        refers to the first element in the resulting 
array.
    +   * @param precise         Sampling by default is random and can result 
in slightly lop-sided
    +   *                        sample sets. When precise is true, equal 
sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fraction: Double,
    +      precise: Boolean = false,
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithUniqueId
    +
    +    if ((fraction >= 1) || (fraction <= 0)) {
    +      throw new IllegalArgumentException("sampling fraction outside of 
(0,1) bounds is nonsensical")
    +    }
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, 
T)](leftSplit)
    +      .where(0)
    +      .equalTo(0).apply {
    +        (full: (Long,T) , left: (Long, T), collector: Collector[T]) =>
    +        if (left == null) {
    +          collector.collect(full._2)
    +        }
    +    }
    +
    +    Array(leftSplit.map(o => o._2), rightSplit)
    +  }
    +
    +  // 
--------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // 
--------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a 
vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the 
DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do 
not lead to over
    +   *                        sampling. The number of splits is dictated by 
the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) 
would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of 
fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fracArray: Array[Double],
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[DataSet[T]] = {
    +
    +    import 
org.apache.commons.math3.distribution.EnumeratedIntegerDistribution
    +
    +    val eid = new EnumeratedIntegerDistribution((0 to fracArray.length - 
1).toArray, fracArray)
    +
    +    eid.reseedRandomGenerator(seed)
    +
    +    val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o))
    +
    +    val splits = fracArray.length
    +    val outputArray = new Array[DataSet[T]]( splits )
    +
    +    for (k <- 0 to splits-1){
    +      outputArray(k) = tempDS.filter(o => o._1 == k)
    +                             .map(o => o._2)
    +    }
    +
    +    outputArray
    +  }
    +
    +  // 
--------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // 
--------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be 
returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the 
training will be the remainder
    +   *                        of the dataset.  The DataSet is split into 
kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      kFolds: Int,
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[TrainTestDataSet[T]] = {
    +
    +    val fracs = Array.fill(kFolds)(1.0)
    +    val dataSetArray = multiRandomSplit(input, fracs, seed)
    +
    +    dataSetArray.map( ds => TrainTestDataSet(ds,
    +                                             dataSetArray.filter(_ != ds)
    +                                                         .reduce(_ union 
_) ))
    +
    +  }
    +
    +  // 
--------------------------------------------------------------------------------------------
    +  //  trainTestSplit
    +  // 
--------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for randomSplit that yields a TrainTestDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, 
should be [0,1].
    +   *                        This fraction refers to the training element 
in TrainTestSplit
    +   * @param precise         Sampling by default is random and can result 
in slightly lop-sided
    +   *                        sample sets. When precise is true, equal 
sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fraction: Double = 0.6,
    +      precise: Boolean = false,
    +      seed: Long = Utils.RNG.nextLong())
    +    : TrainTestDataSet[T] = {
    +    val dataSetArray = randomSplit(input, fraction, precise, seed)
    +    TrainTestDataSet(dataSetArray(0), dataSetArray(1))
    +  }
    +
    +  // 
--------------------------------------------------------------------------------------------
    +  //  trainTestHoldoutSplit
    +  // 
--------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for multiRandomSplit that yields a TrainTestHoldoutDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of three doubles, where the first 
element specifies the
    +   *                        size of the training set, the second element 
the testing set, and
    +   *                        the third element is the holdout set. These 
are proportional and
    +   *                        will be normalized internally.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestHoldoutSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fracArray: Array[Double] = Array(0.6,0.3,0.1),
    --- End diff --
    
    Why not requiring a `Tuple3[Double, Double, Double]` here, then we wouldn't 
have to do the check that the array has the correct length == one possibility 
where the user can shoot himself in the foot less ;-)


> Support training Estimators using a (train, validation, test) split of the 
> available data
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-2259
>                 URL: https://issues.apache.org/jira/browse/FLINK-2259
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Theodore Vasiloudis
>            Assignee: Trevor Grant
>            Priority: Minor
>              Labels: ML
>
> When there is an abundance of data available, a good way to train models is 
> to split the available data into 3 parts: Train, Validation and Test.
> We use the Train data to train the model, the Validation part is used to 
> estimate the test error and select hyperparameters, and the Test is used to 
> evaluate the performance of the model, and assess its generalization [1]
> This is a common approach when training Artificial Neural Networks, and a 
> good strategy to choose in data-rich environments. Therefore we should have 
> some support of this data-analysis process in our Estimators.
> [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of 
> statistical learning. Vol. 1. Springer, Berlin: Springer series in 
> statistics, 2001.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to