[
https://issues.apache.org/jira/browse/MAHOUT-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213319#comment-15213319
]
ASF GitHub Bot commented on MAHOUT-1817:
----------------------------------------
Github user andrewpalumbo commented on a diff in the pull request:
https://github.com/apache/mahout/pull/203#discussion_r57523033
--- Diff:
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
---
@@ -76,20 +94,31 @@ class CheckpointedFlinkDrm[K:
ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
+ /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+ * the dataset to the filesystem and read it back when cache is called
*/
def cache() = {
if (!isCached) {
- cacheFileName = System.nanoTime().toString
+ cacheFileName = persistanceRootDir + System.nanoTime().toString
parallelismDeg = ds.getParallelism
isCached = true
+ persist(ds, cacheFileName)
}
- implicit val typeInformation = createTypeInformation[(K,Vector)]
+ val _ds = readPersistedDataSet(cacheFileName, ds)
- val _ds = persist(ds, persistanceRootDir + cacheFileName)
+ // We may want to look more closely at this:
+ // since we've cached a drm, triggering a computation
+ // it may not make sense to keep the same parallelism degree
+ if (!(parallelismDeg == _ds.getParallelism)) {
+ _ds.setParallelism(parallelismDeg).rebalance()
--- End diff --
not sure what to do here, this `rebalance` call seems to adding some time
to the caching call.
> Implement caching in Flink Bindings
> -----------------------------------
>
> Key: MAHOUT-1817
> URL: https://issues.apache.org/jira/browse/MAHOUT-1817
> Project: Mahout
> Issue Type: New Feature
> Components: Flink
> Affects Versions: 0.11.2
> Reporter: Andrew Palumbo
> Assignee: Andrew Palumbo
> Priority: Blocker
> Fix For: 0.12.0
>
>
> Flink does not have in-memory caching analogous to that of Spark. We need
> find a way to honour the {{checkpoint()}} contract in Flink Bindings.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)