Persist only if the dataset has not been cached. Otherwise read back in already cached dataset
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9c5ee592 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9c5ee592 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9c5ee592 Branch: refs/heads/master Commit: 9c5ee59214a454f7ae25c762bf04bb30bd7982c8 Parents: a1cf7cf Author: Andrew Palumbo <[email protected]> Authored: Fri Mar 25 18:10:18 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Fri Mar 25 18:10:18 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 4 +-- .../drm/CheckpointedFlinkDrm.scala | 32 +++++++++++++++----- 2 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index af508b3..0640ebe 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -359,9 +359,9 @@ object FlinkEngine extends DistributedEngine { } def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { - implicit val tag = ClassTag[K] + implicit val ktag = classTag[K] - generateTypeInformationFromTag(tag) + generateTypeInformationFromTag(ktag) } private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index ea96e88..65acbd6 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -77,14 +77,17 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] def cache() = { + implicit val typeInformation = createTypeInformation[(K,Vector)] + implicit val inputFormat = (ds.getType) if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } - implicit val typeInformation = createTypeInformation[(K,Vector)] - val _ds = persist(ds, persistanceRootDir + cacheFileName) + val _ds = readPersistedDataSet(cacheFileName, ds) + datasetWrap(_ds) } @@ -99,12 +102,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], * @param dataset [[DataSet]] to write to disk * @param path File path to write dataset to * @tparam T Type of the [[DataSet]] elements - * @return [[DataSet]] reading the just written file */ - def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = { + def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = { val env = dataset.getExecutionEnvironment val outputFormat = new TypeSerializerOutputFormat[T] - val filePath = new Path(path) outputFormat.setOutputFilePath(filePath) @@ -112,14 +113,29 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], dataset.output(outputFormat) env.execute("FlinkTools persist") + } + + /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent + * operations. + * + * @param path File path to read dataset from + * @param ds persisted ds to retrieve type information and environment forom + * @tparam T key Type of the [[DataSet]] elements + * @return [[DataSet]] reading the just written file + */ + def readPersistedDataSet[T: ClassTag : TypeInformation] + (path: String, ds: DataSet[T]): DataSet[T] = { - val inputFormat = new TypeSerializerInputFormat[T](dataset.getType) + val env = ds.getExecutionEnvironment + val inputFormat = new TypeSerializerInputFormat[T](ds.getType()) + val filePath = new Path(path) inputFormat.setFilePath(filePath) env.createInput(inputFormat) } - // Members declared in org.apache.mahout.math.drm.DrmLike + + // Members declared in org.apache.mahout.math.drm.DrmLike protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
