http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java index d54ce97..cb9fd06 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java @@ -1,372 +1,372 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.parfor; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -import org.apache.sysml.api.DMLScript; -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; -import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; -import org.apache.sysml.runtime.controlprogram.caching.CacheableData; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType; -import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor; -import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; -import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; -import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; -import org.apache.sysml.runtime.instructions.cp.IntObject; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.OutputInfo; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; -import org.apache.sysml.runtime.util.LocalFileUtils; -import org.apache.sysml.utils.Statistics; - -/** - * - */ -public class RemoteDPParWorkerReducer extends ParWorker - implements Reducer<LongWritable, Writable, Writable, Writable> -{ - - //MR data partitioning attributes - private String _inputVar = null; - private String _iterVar = null; - private PDataPartitionFormat _dpf = null; - private OutputInfo _info = null; - private int _rlen = -1; - private int _clen = -1; - private int _brlen = -1; - private int _bclen = -1; - - //reuse matrix partition - private MatrixBlock _partition = null; - private boolean _tSparseCol = false; - - //MR ParWorker attributes - protected String _stringID = null; - protected HashMap<String, String> _rvarFnames = null; - - //cached collector/reporter - protected OutputCollector<Writable, Writable> _out = null; - protected Reporter _report = null; - - /** - * - */ - public RemoteDPParWorkerReducer() - { - - } - - @Override - public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter) - throws IOException - { - //cache collector/reporter (for write in close) - _out = out; - _report = reporter; - - //collect input partition - if( _info == OutputInfo.BinaryBlockOutputInfo ) - _partition = collectBinaryBlock( valueList ); - else - _partition = collectBinaryCellInput( valueList ); - - //update in-memory matrix partition - MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar ); - mo.setInMemoryPartition( _partition ); - - //execute program - LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" ("+_workerID+")"); - try { - //create tasks for input data - Task lTask = new Task(TaskType.SET); - lTask.addIteration( new IntObject(_iterVar,key.get()) ); - - //execute program - executeTask( lTask ); - } - catch(Exception ex) - { - throw new IOException("ParFOR: Failed to execute task.",ex); - } - - //statistic maintenance (after final export) - RemoteParForUtils.incrementParForMRCounters(_report, 1, 1); - } - - /** - * - */ - @Override - public void configure(JobConf job) - { - //Step 1: configure data partitioning information - _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job ); - _clen = (int)MRJobConfiguration.getPartitioningNumCols( job ); - _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job ); - _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job ); - _iterVar = MRJobConfiguration.getPartitioningItervar( job ); - _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job ); - _dpf = MRJobConfiguration.getPartitioningFormat( job ); - switch( _dpf ) { //create matrix partition for reuse - case ROW_WISE: _rlen = 1; break; - case COLUMN_WISE: _clen = 1; break; - default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf); - } - _info = MRJobConfiguration.getPartitioningOutputInfo( job ); - _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); - if( _tSparseCol ) - _partition = new MatrixBlock((int)_clen, _rlen, true); - else - _partition = new MatrixBlock((int)_rlen, _clen, false); - - //Step 1: configure parworker - String taskID = job.get("mapred.tip.id"); - LOG.trace("configure RemoteDPParWorkerReducer "+taskID); - - try - { - _stringID = taskID; - _workerID = IDHandler.extractIntID(_stringID); //int task ID - - //use the given job configuration as source for all new job confs - //NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar - //which includes a core-default.xml configuration which hides the actual default cluster configuration - //in the context of mr jobs (for example this config points to local fs instead of hdfs by default). - if( !InfrastructureAnalyzer.isLocalMode(job) ) { - ConfigurationManager.setCachedJobConf(job); - } - - //create local runtime program - String in = MRJobConfiguration.getProgramBlocks(job); - ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID); - _childBlocks = body.getChildBlocks(); - _ec = body.getEc(); - _resultVars = body.getResultVarNames(); - - //init local cache manager - if( !CacheableData.isCachingActive() ) { - String uuid = IDHandler.createDistributedUniqueID(); - LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); - CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup) - } - if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode - CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; - } - - //ensure that resultvar files are not removed - super.pinResultVariables(); - - //enable/disable caching (if required) - boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job ); - if( !cpCaching ) - CacheableData.disableCaching(); - - _numTasks = 0; - _numIters = 0; - } - catch(Exception ex) - { - throw new RuntimeException(ex); - } - - //disable parfor stat monitoring, reporting execution times via counters not useful - StatisticMonitor.disableStatMonitoring(); - - //always reset stats because counters per map task (for case of JVM reuse) - if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) ) - { - CacheStatistics.reset(); - Statistics.reset(); - } - } - - /** - * - */ - @Override - public void close() - throws IOException - { - try - { - //write output if required (matrix indexed write) - RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _out ); - - //statistic maintenance (after final export) - RemoteParForUtils.incrementParForMRCounters(_report, 0, 0); - - //print heaver hitter per task - JobConf job = ConfigurationManager.getCachedJobConf(); - if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) ) - LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10)); - } - catch(Exception ex) - { - throw new IOException( ex ); - } - - //cleanup cache and local tmp dir - RemoteParForUtils.cleanupWorkingDirectories(); - - //ensure caching is not disabled for CP in local mode - CacheableData.enableCaching(); - } - - /** - * Collects a matrixblock partition from a given input iterator over - * binary blocks. - * - * Note it reuses the instance attribute _partition - multiple calls - * will overwrite the result. - * - * @param valueList - * @return - * @throws IOException - */ - private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList ) - throws IOException - { - try - { - //reset reuse block, keep configured representation - _partition.reset(_rlen, _clen); - - while( valueList.hasNext() ) - { - PairWritableBlock pairValue = (PairWritableBlock)valueList.next(); - int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen; - int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen; - MatrixBlock block = pairValue.block; - if( !_partition.isInSparseFormat() ) //DENSE - { - _partition.copy( row_offset, row_offset+block.getNumRows()-1, - col_offset, col_offset+block.getNumColumns()-1, - pairValue.block, false ); - } - else //SPARSE - { - _partition.appendToSparse(pairValue.block, row_offset, col_offset); - } - } - - //final partition cleanup - cleanupCollectedMatrixPartition( _partition.isInSparseFormat() ); - } - catch(DMLRuntimeException ex) - { - throw new IOException(ex); - } - - return _partition; - } - - - /** - * Collects a matrixblock partition from a given input iterator over - * binary cells. - * - * Note it reuses the instance attribute _partition - multiple calls - * will overwrite the result. - * - * @param valueList - * @return - * @throws IOException - */ - private MatrixBlock collectBinaryCellInput( Iterator<Writable> valueList ) - throws IOException - { - //reset reuse block, keep configured representation - if( _tSparseCol ) - _partition.reset(_clen, _rlen); - else - _partition.reset(_rlen, _clen); - - switch( _dpf ) - { - case ROW_WISE: - while( valueList.hasNext() ) - { - PairWritableCell pairValue = (PairWritableCell)valueList.next(); - if( pairValue.indexes.getColumnIndex()<0 ) - continue; //cells used to ensure empty partitions - _partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue()); - } - break; - case COLUMN_WISE: - while( valueList.hasNext() ) - { - PairWritableCell pairValue = (PairWritableCell)valueList.next(); - if( pairValue.indexes.getRowIndex()<0 ) - continue; //cells used to ensure empty partitions - if( _tSparseCol ) - _partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue()); - else - _partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue()); - } - break; - default: - throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf); - } - - //final partition cleanup - cleanupCollectedMatrixPartition(_tSparseCol); - - return _partition; - } - - /** - * - * @param sort - * @throws IOException - */ - private void cleanupCollectedMatrixPartition(boolean sort) - throws IOException - { - //sort sparse row contents if required - if( _partition.isInSparseFormat() && sort ) - _partition.sortSparseRows(); - - //ensure right number of nnz - if( !_partition.isInSparseFormat() ) - _partition.recomputeNonZeros(); - - //exam and switch dense/sparse representation - try { - _partition.examSparsity(); - } - catch(Exception ex){ - throw new IOException(ex); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.parfor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor; +import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; +import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; +import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; +import org.apache.sysml.runtime.instructions.cp.IntObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; +import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.utils.Statistics; + +/** + * + */ +public class RemoteDPParWorkerReducer extends ParWorker + implements Reducer<LongWritable, Writable, Writable, Writable> +{ + + //MR data partitioning attributes + private String _inputVar = null; + private String _iterVar = null; + private PDataPartitionFormat _dpf = null; + private OutputInfo _info = null; + private int _rlen = -1; + private int _clen = -1; + private int _brlen = -1; + private int _bclen = -1; + + //reuse matrix partition + private MatrixBlock _partition = null; + private boolean _tSparseCol = false; + + //MR ParWorker attributes + protected String _stringID = null; + protected HashMap<String, String> _rvarFnames = null; + + //cached collector/reporter + protected OutputCollector<Writable, Writable> _out = null; + protected Reporter _report = null; + + /** + * + */ + public RemoteDPParWorkerReducer() + { + + } + + @Override + public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter) + throws IOException + { + //cache collector/reporter (for write in close) + _out = out; + _report = reporter; + + //collect input partition + if( _info == OutputInfo.BinaryBlockOutputInfo ) + _partition = collectBinaryBlock( valueList ); + else + _partition = collectBinaryCellInput( valueList ); + + //update in-memory matrix partition + MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar ); + mo.setInMemoryPartition( _partition ); + + //execute program + LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" ("+_workerID+")"); + try { + //create tasks for input data + Task lTask = new Task(TaskType.SET); + lTask.addIteration( new IntObject(_iterVar,key.get()) ); + + //execute program + executeTask( lTask ); + } + catch(Exception ex) + { + throw new IOException("ParFOR: Failed to execute task.",ex); + } + + //statistic maintenance (after final export) + RemoteParForUtils.incrementParForMRCounters(_report, 1, 1); + } + + /** + * + */ + @Override + public void configure(JobConf job) + { + //Step 1: configure data partitioning information + _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job ); + _clen = (int)MRJobConfiguration.getPartitioningNumCols( job ); + _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job ); + _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job ); + _iterVar = MRJobConfiguration.getPartitioningItervar( job ); + _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job ); + _dpf = MRJobConfiguration.getPartitioningFormat( job ); + switch( _dpf ) { //create matrix partition for reuse + case ROW_WISE: _rlen = 1; break; + case COLUMN_WISE: _clen = 1; break; + default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf); + } + _info = MRJobConfiguration.getPartitioningOutputInfo( job ); + _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); + if( _tSparseCol ) + _partition = new MatrixBlock((int)_clen, _rlen, true); + else + _partition = new MatrixBlock((int)_rlen, _clen, false); + + //Step 1: configure parworker + String taskID = job.get("mapred.tip.id"); + LOG.trace("configure RemoteDPParWorkerReducer "+taskID); + + try + { + _stringID = taskID; + _workerID = IDHandler.extractIntID(_stringID); //int task ID + + //use the given job configuration as source for all new job confs + //NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar + //which includes a core-default.xml configuration which hides the actual default cluster configuration + //in the context of mr jobs (for example this config points to local fs instead of hdfs by default). + if( !InfrastructureAnalyzer.isLocalMode(job) ) { + ConfigurationManager.setCachedJobConf(job); + } + + //create local runtime program + String in = MRJobConfiguration.getProgramBlocks(job); + ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID); + _childBlocks = body.getChildBlocks(); + _ec = body.getEc(); + _resultVars = body.getResultVarNames(); + + //init local cache manager + if( !CacheableData.isCachingActive() ) { + String uuid = IDHandler.createDistributedUniqueID(); + LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); + CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup) + } + if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode + CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; + } + + //ensure that resultvar files are not removed + super.pinResultVariables(); + + //enable/disable caching (if required) + boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job ); + if( !cpCaching ) + CacheableData.disableCaching(); + + _numTasks = 0; + _numIters = 0; + } + catch(Exception ex) + { + throw new RuntimeException(ex); + } + + //disable parfor stat monitoring, reporting execution times via counters not useful + StatisticMonitor.disableStatMonitoring(); + + //always reset stats because counters per map task (for case of JVM reuse) + if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) ) + { + CacheStatistics.reset(); + Statistics.reset(); + } + } + + /** + * + */ + @Override + public void close() + throws IOException + { + try + { + //write output if required (matrix indexed write) + RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _out ); + + //statistic maintenance (after final export) + RemoteParForUtils.incrementParForMRCounters(_report, 0, 0); + + //print heaver hitter per task + JobConf job = ConfigurationManager.getCachedJobConf(); + if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) ) + LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10)); + } + catch(Exception ex) + { + throw new IOException( ex ); + } + + //cleanup cache and local tmp dir + RemoteParForUtils.cleanupWorkingDirectories(); + + //ensure caching is not disabled for CP in local mode + CacheableData.enableCaching(); + } + + /** + * Collects a matrixblock partition from a given input iterator over + * binary blocks. + * + * Note it reuses the instance attribute _partition - multiple calls + * will overwrite the result. + * + * @param valueList + * @return + * @throws IOException + */ + private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList ) + throws IOException + { + try + { + //reset reuse block, keep configured representation + _partition.reset(_rlen, _clen); + + while( valueList.hasNext() ) + { + PairWritableBlock pairValue = (PairWritableBlock)valueList.next(); + int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen; + int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen; + MatrixBlock block = pairValue.block; + if( !_partition.isInSparseFormat() ) //DENSE + { + _partition.copy( row_offset, row_offset+block.getNumRows()-1, + col_offset, col_offset+block.getNumColumns()-1, + pairValue.block, false ); + } + else //SPARSE + { + _partition.appendToSparse(pairValue.block, row_offset, col_offset); + } + } + + //final partition cleanup + cleanupCollectedMatrixPartition( _partition.isInSparseFormat() ); + } + catch(DMLRuntimeException ex) + { + throw new IOException(ex); + } + + return _partition; + } + + + /** + * Collects a matrixblock partition from a given input iterator over + * binary cells. + * + * Note it reuses the instance attribute _partition - multiple calls + * will overwrite the result. + * + * @param valueList + * @return + * @throws IOException + */ + private MatrixBlock collectBinaryCellInput( Iterator<Writable> valueList ) + throws IOException + { + //reset reuse block, keep configured representation + if( _tSparseCol ) + _partition.reset(_clen, _rlen); + else + _partition.reset(_rlen, _clen); + + switch( _dpf ) + { + case ROW_WISE: + while( valueList.hasNext() ) + { + PairWritableCell pairValue = (PairWritableCell)valueList.next(); + if( pairValue.indexes.getColumnIndex()<0 ) + continue; //cells used to ensure empty partitions + _partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue()); + } + break; + case COLUMN_WISE: + while( valueList.hasNext() ) + { + PairWritableCell pairValue = (PairWritableCell)valueList.next(); + if( pairValue.indexes.getRowIndex()<0 ) + continue; //cells used to ensure empty partitions + if( _tSparseCol ) + _partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue()); + else + _partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue()); + } + break; + default: + throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf); + } + + //final partition cleanup + cleanupCollectedMatrixPartition(_tSparseCol); + + return _partition; + } + + /** + * + * @param sort + * @throws IOException + */ + private void cleanupCollectedMatrixPartition(boolean sort) + throws IOException + { + //sort sparse row contents if required + if( _partition.isInSparseFormat() && sort ) + _partition.sortSparseRows(); + + //ensure right number of nnz + if( !_partition.isInSparseFormat() ) + _partition.recomputeNonZeros(); + + //exam and switch dense/sparse representation + try { + _partition.examSparsity(); + } + catch(Exception ex){ + throw new IOException(ex); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java index 2426bc2..67beff6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java @@ -1,266 +1,266 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.parfor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -import scala.Tuple2; - -import org.apache.sysml.api.DMLScript; -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.parser.Expression.DataType; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.LocalVariableMap; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; -import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; -import org.apache.sysml.runtime.controlprogram.caching.CacheableData; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; -import org.apache.sysml.runtime.instructions.cp.Data; -import org.apache.sysml.runtime.util.LocalFileUtils; -import org.apache.sysml.utils.Statistics; - -/** - * Common functionalities for parfor workers in MR jobs. Used by worker wrappers in - * mappers (base RemoteParFor) and reducers (fused data partitioning and parfor) - * - */ -public class RemoteParForUtils -{ - - /** - * - * @param reporter - * @param deltaTasks - * @param deltaIterations - */ - public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations) - { - //report parfor counters - if( deltaTasks>0 ) - reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks); - if( deltaIterations>0 ) - reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations); - - JobConf job = ConfigurationManager.getCachedJobConf(); - if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) ) - { - //report cache statistics - reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime()); - reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount()); - reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime()); - reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime()); - - //reset cache statistics to prevent overlapping reporting - CacheStatistics.reset(); - } - } - - /** - * - * @param workerID - * @param vars - * @param resultVars - * @param out - * @throws DMLRuntimeException - * @throws IOException - */ - public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out ) - throws DMLRuntimeException, IOException - { - exportResultVariables(workerID, vars, resultVars, null, out); - } - - /** - * For remote MR parfor workers. - * - * @param workerID - * @param vars - * @param resultVars - * @param rvarFnames - * @param out - * @throws DMLRuntimeException - * @throws IOException - */ - public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, - HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) - throws DMLRuntimeException, IOException - { - //create key and value for reuse - LongWritable okey = new LongWritable( workerID ); - Text ovalue = new Text(); - - //foreach result variables probe if export necessary - for( String rvar : resultVars ) - { - Data dat = vars.get( rvar ); - - //export output variable to HDFS (see RunMRJobs) - if ( dat != null && dat.getDataType() == DataType.MATRIX ) - { - MatrixObject mo = (MatrixObject) dat; - if( mo.isDirty() ) - { - if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null ) - { - String fname = rvarFnames.get( rvar ); - if( fname!=null ) - mo.setFileName( fname ); - - //export result var (iff actually modified in parfor) - mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation) - rvarFnames.put(rvar, mo.getFileName()); - } - else - { - //export result var (iff actually modified in parfor) - mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation) - } - - //pass output vars (scalars by value, matrix by ref) to result - //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge) - String datStr = ProgramConverter.serializeDataObject(rvar, mo); - ovalue.set( datStr ); - out.collect( okey, ovalue ); - } - } - } - } - - /** - * For remote Spark parfor workers. This is a simplified version compared to MR. - * - * @param workerID - * @param vars - * @param resultVars - * @param rvarFnames - * @throws DMLRuntimeException - * @throws IOException - */ - public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars) - throws DMLRuntimeException, IOException - { - ArrayList<String> ret = new ArrayList<String>(); - - //foreach result variables probe if export necessary - for( String rvar : resultVars ) - { - Data dat = vars.get( rvar ); - - //export output variable to HDFS (see RunMRJobs) - if ( dat != null && dat.getDataType() == DataType.MATRIX ) - { - MatrixObject mo = (MatrixObject) dat; - if( mo.isDirty() ) - { - //export result var (iff actually modified in parfor) - mo.exportData(); - - - //pass output vars (scalars by value, matrix by ref) to result - //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge) - ret.add( ProgramConverter.serializeDataObject(rvar, mo) ); - } - } - } - - return ret; - } - - - /** - * Cleanup all temporary files created by this SystemML process - * instance. - * - */ - public static void cleanupWorkingDirectories() - { - //use the given job configuration for infrastructure analysis (see configure); - //this is important for robustness w/ misconfigured classpath which also contains - //core-default.xml and hence hides the actual cluster configuration; otherwise - //there is missing cleanup of working directories - JobConf job = ConfigurationManager.getCachedJobConf(); - - if( !InfrastructureAnalyzer.isLocalMode(job) ) - { - //delete cache files - CacheableData.cleanupCacheDir(); - //disable caching (prevent dynamic eviction) - CacheableData.disableCaching(); - //cleanup working dir (e.g., of CP_FILE instructions) - LocalFileUtils.cleanupWorkingDirectory(); - } - } - - /** - * - * @param out - * @return - * @throws DMLRuntimeException - * @throws IOException - */ - public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) - throws DMLRuntimeException - { - HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>(); - - int countAll = 0; - for( Tuple2<Long,String> entry : out ) - { - Long key = entry._1(); - String val = entry._2(); - if( !tmp.containsKey( key ) ) - tmp.put(key, new LocalVariableMap ()); - Object[] dat = ProgramConverter.parseDataObject( val ); - tmp.get(key).put((String)dat[0], (Data)dat[1]); - countAll++; - } - - if( LOG != null ) { - LOG.debug("Num remote worker results (before deduplication): "+countAll); - LOG.debug("Num remote worker results: "+tmp.size()); - } - - //create return array - return tmp.values().toArray(new LocalVariableMap[0]); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.parfor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import scala.Tuple2; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.LocalVariableMap; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; +import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; +import org.apache.sysml.runtime.instructions.cp.Data; +import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.utils.Statistics; + +/** + * Common functionalities for parfor workers in MR jobs. Used by worker wrappers in + * mappers (base RemoteParFor) and reducers (fused data partitioning and parfor) + * + */ +public class RemoteParForUtils +{ + + /** + * + * @param reporter + * @param deltaTasks + * @param deltaIterations + */ + public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations) + { + //report parfor counters + if( deltaTasks>0 ) + reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks); + if( deltaIterations>0 ) + reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations); + + JobConf job = ConfigurationManager.getCachedJobConf(); + if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) ) + { + //report cache statistics + reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime()); + reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount()); + reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime()); + reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime()); + + //reset cache statistics to prevent overlapping reporting + CacheStatistics.reset(); + } + } + + /** + * + * @param workerID + * @param vars + * @param resultVars + * @param out + * @throws DMLRuntimeException + * @throws IOException + */ + public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out ) + throws DMLRuntimeException, IOException + { + exportResultVariables(workerID, vars, resultVars, null, out); + } + + /** + * For remote MR parfor workers. + * + * @param workerID + * @param vars + * @param resultVars + * @param rvarFnames + * @param out + * @throws DMLRuntimeException + * @throws IOException + */ + public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, + HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) + throws DMLRuntimeException, IOException + { + //create key and value for reuse + LongWritable okey = new LongWritable( workerID ); + Text ovalue = new Text(); + + //foreach result variables probe if export necessary + for( String rvar : resultVars ) + { + Data dat = vars.get( rvar ); + + //export output variable to HDFS (see RunMRJobs) + if ( dat != null && dat.getDataType() == DataType.MATRIX ) + { + MatrixObject mo = (MatrixObject) dat; + if( mo.isDirty() ) + { + if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null ) + { + String fname = rvarFnames.get( rvar ); + if( fname!=null ) + mo.setFileName( fname ); + + //export result var (iff actually modified in parfor) + mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation) + rvarFnames.put(rvar, mo.getFileName()); + } + else + { + //export result var (iff actually modified in parfor) + mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation) + } + + //pass output vars (scalars by value, matrix by ref) to result + //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge) + String datStr = ProgramConverter.serializeDataObject(rvar, mo); + ovalue.set( datStr ); + out.collect( okey, ovalue ); + } + } + } + } + + /** + * For remote Spark parfor workers. This is a simplified version compared to MR. + * + * @param workerID + * @param vars + * @param resultVars + * @param rvarFnames + * @throws DMLRuntimeException + * @throws IOException + */ + public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars) + throws DMLRuntimeException, IOException + { + ArrayList<String> ret = new ArrayList<String>(); + + //foreach result variables probe if export necessary + for( String rvar : resultVars ) + { + Data dat = vars.get( rvar ); + + //export output variable to HDFS (see RunMRJobs) + if ( dat != null && dat.getDataType() == DataType.MATRIX ) + { + MatrixObject mo = (MatrixObject) dat; + if( mo.isDirty() ) + { + //export result var (iff actually modified in parfor) + mo.exportData(); + + + //pass output vars (scalars by value, matrix by ref) to result + //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge) + ret.add( ProgramConverter.serializeDataObject(rvar, mo) ); + } + } + } + + return ret; + } + + + /** + * Cleanup all temporary files created by this SystemML process + * instance. + * + */ + public static void cleanupWorkingDirectories() + { + //use the given job configuration for infrastructure analysis (see configure); + //this is important for robustness w/ misconfigured classpath which also contains + //core-default.xml and hence hides the actual cluster configuration; otherwise + //there is missing cleanup of working directories + JobConf job = ConfigurationManager.getCachedJobConf(); + + if( !InfrastructureAnalyzer.isLocalMode(job) ) + { + //delete cache files + CacheableData.cleanupCacheDir(); + //disable caching (prevent dynamic eviction) + CacheableData.disableCaching(); + //cleanup working dir (e.g., of CP_FILE instructions) + LocalFileUtils.cleanupWorkingDirectory(); + } + } + + /** + * + * @param out + * @return + * @throws DMLRuntimeException + * @throws IOException + */ + public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) + throws DMLRuntimeException + { + HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>(); + + int countAll = 0; + for( Tuple2<Long,String> entry : out ) + { + Long key = entry._1(); + String val = entry._2(); + if( !tmp.containsKey( key ) ) + tmp.put(key, new LocalVariableMap ()); + Object[] dat = ProgramConverter.parseDataObject( val ); + tmp.get(key).put((String)dat[0], (Data)dat[1]); + countAll++; + } + + if( LOG != null ) { + LOG.debug("Num remote worker results (before deduplication): "+countAll); + LOG.debug("Num remote worker results: "+tmp.size()); + } + + //create return array + return tmp.values().toArray(new LocalVariableMap[0]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml index 6f478bc..e6556fa 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml @@ -1,58 +1,58 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - - -#PerfTestTool: DML template for estimation cost functions. - -dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer n) -return (Matrix[Double] D) -implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem") - -dynWrite = externalFunction(Matrix[Double] R, String fname) -return (Matrix[Double] D) -implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem") - -solve = externalFunction(Matrix[Double] A, Matrix[Double] y) -return (Matrix[Double] b) -implemented in (classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem") - -k = %numModels%; -m = -1; -n = -1; - -dummy = matrix(1,rows=1,cols=1); - -for( i in 1:k, par=8, mode=LOCAL ) -{ - sin1 = "./conf/PerfTestTool/"+i+"_in1.csv"; - sin2 = "./conf/PerfTestTool/"+i+"_in2.csv"; - - D = dynRead( dummy, sin1, m, n ); - y = dynRead( dummy, sin2, m, 1 ); - - A = t(D) %*% D; # X'X - b = t(D) %*% y; # X'y - beta = solve(A,b); - - sout = "./conf/PerfTestTool/"+i+"_out.csv"; - - X=dynWrite( beta, sout ); +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +#PerfTestTool: DML template for estimation cost functions. + +dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer n) +return (Matrix[Double] D) +implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem") + +dynWrite = externalFunction(Matrix[Double] R, String fname) +return (Matrix[Double] D) +implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem") + +solve = externalFunction(Matrix[Double] A, Matrix[Double] y) +return (Matrix[Double] b) +implemented in (classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem") + +k = %numModels%; +m = -1; +n = -1; + +dummy = matrix(1,rows=1,cols=1); + +for( i in 1:k, par=8, mode=LOCAL ) +{ + sin1 = "./conf/PerfTestTool/"+i+"_in1.csv"; + sin2 = "./conf/PerfTestTool/"+i+"_in2.csv"; + + D = dynRead( dummy, sin1, m, n ); + y = dynRead( dummy, sin2, m, 1 ); + + A = t(D) %*% D; # X'X + b = t(D) %*% y; # X'y + beta = solve(A,b); + + sout = "./conf/PerfTestTool/"+i+"_out.csv"; + + X=dynWrite( beta, sout ); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java index 1dd419f..13c68e2 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java @@ -1,64 +1,64 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.instructions.spark.data; - -import java.lang.ref.SoftReference; - -import org.apache.spark.broadcast.Broadcast; - -public class BroadcastObject extends LineageObject -{ - //soft reference storage for graceful cleanup in case of memory pressure - private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null; - - public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName ) - { - _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar); - _varName = varName; - } - - /** - * - * @return - */ - public PartitionedBroadcastMatrix getBroadcast() - { - return _bcHandle.get(); - } - - /** - * - * @return - */ - public boolean isValid() - { - //check for evicted soft reference - PartitionedBroadcastMatrix pbm = _bcHandle.get(); - if( pbm == null ) - return false; - - //check for validity of individual broadcasts - Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts(); - for( Broadcast<PartitionedMatrixBlock> bc : tmp ) - if( !bc.isValid() ) - return false; - return true; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark.data; + +import java.lang.ref.SoftReference; + +import org.apache.spark.broadcast.Broadcast; + +public class BroadcastObject extends LineageObject +{ + //soft reference storage for graceful cleanup in case of memory pressure + private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null; + + public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName ) + { + _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar); + _varName = varName; + } + + /** + * + * @return + */ + public PartitionedBroadcastMatrix getBroadcast() + { + return _bcHandle.get(); + } + + /** + * + * @return + */ + public boolean isValid() + { + //check for evicted soft reference + PartitionedBroadcastMatrix pbm = _bcHandle.get(); + if( pbm == null ) + return false; + + //check for validity of individual broadcasts + Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts(); + for( Broadcast<PartitionedMatrixBlock> bc : tmp ) + if( !bc.isValid() ) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java index b2bb62c..bcf37bb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java @@ -1,83 +1,83 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.instructions.spark.data; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; - -public abstract class LineageObject -{ - - //basic lineage information - protected int _numRef = -1; - protected List<LineageObject> _childs = null; - protected String _varName = null; - - //N:1 back reference to matrix object - protected MatrixObject _mo = null; - - protected LineageObject() - { - _numRef = 0; - _childs = new ArrayList<LineageObject>(); - } - - public String getVarName() { - return _varName; - } - - public int getNumReferences() - { - return _numRef; - } - - public void setBackReference(MatrixObject mo) - { - _mo = mo; - } - - public boolean hasBackReference() - { - return (_mo != null); - } - - public void incrementNumReferences() - { - _numRef++; - } - - public void decrementNumReferences() - { - _numRef--; - } - - public List<LineageObject> getLineageChilds() - { - return _childs; - } - - public void addLineageChild(LineageObject lob) - { - lob.incrementNumReferences(); - _childs.add( lob ); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark.data; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; + +public abstract class LineageObject +{ + + //basic lineage information + protected int _numRef = -1; + protected List<LineageObject> _childs = null; + protected String _varName = null; + + //N:1 back reference to matrix object + protected MatrixObject _mo = null; + + protected LineageObject() + { + _numRef = 0; + _childs = new ArrayList<LineageObject>(); + } + + public String getVarName() { + return _varName; + } + + public int getNumReferences() + { + return _numRef; + } + + public void setBackReference(MatrixObject mo) + { + _mo = mo; + } + + public boolean hasBackReference() + { + return (_mo != null); + } + + public void incrementNumReferences() + { + _numRef++; + } + + public void decrementNumReferences() + { + _numRef--; + } + + public List<LineageObject> getLineageChilds() + { + return _childs; + } + + public void addLineageChild(LineageObject lob) + { + lob.incrementNumReferences(); + _childs.add( lob ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java index fb7e773..605e7ca 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java @@ -1,124 +1,124 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.instructions.spark.data; - -import org.apache.spark.api.java.JavaPairRDD; - -public class RDDObject extends LineageObject -{ - - private JavaPairRDD<?,?> _rddHandle = null; - - //meta data on origin of given rdd handle - private boolean _checkpointed = false; //created via checkpoint instruction - private boolean _hdfsfile = false; //created from hdfs file - private String _hdfsFname = null; //hdfs filename, if created from hdfs. - - public RDDObject( JavaPairRDD<?,?> rddvar, String varName) - { - _rddHandle = rddvar; - _varName = varName; - } - - /** - * - * @return - */ - public JavaPairRDD<?,?> getRDD() - { - return _rddHandle; - } - - public void setCheckpointRDD( boolean flag ) - { - _checkpointed = flag; - } - - public boolean isCheckpointRDD() - { - return _checkpointed; - } - - public void setHDFSFile( boolean flag ) { - _hdfsfile = flag; - } - - public void setHDFSFilename( String fname ) { - _hdfsFname = fname; - } - - public boolean isHDFSFile() { - return _hdfsfile; - } - - public String getHDFSFilename() { - return _hdfsFname; - } - - - /** - * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file; - * in both cases, we can directly read the file instead of collecting - * the given rdd. - * - * @return - */ - public boolean allowsShortCircuitRead() - { - boolean ret = isHDFSFile(); - - if( isCheckpointRDD() && getLineageChilds().size() == 1 ) { - LineageObject lo = getLineageChilds().get(0); - ret = ( lo instanceof RDDObject && ((RDDObject)lo).isHDFSFile() ); - } - - return ret; - } - - /** - * - * @return - */ - public boolean allowsShortCircuitCollect() - { - return ( isCheckpointRDD() && getLineageChilds().size() == 1 - && getLineageChilds().get(0) instanceof RDDObject ); - } - - /** - * - * @return - */ - public boolean rHasCheckpointRDDChilds() - { - //probe for checkpoint rdd - if( _checkpointed ) - return true; - - //process childs recursively - boolean ret = false; - for( LineageObject lo : getLineageChilds() ) { - if( lo instanceof RDDObject ) - ret |= ((RDDObject)lo).rHasCheckpointRDDChilds(); - } - - return ret; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark.data; + +import org.apache.spark.api.java.JavaPairRDD; + +public class RDDObject extends LineageObject +{ + + private JavaPairRDD<?,?> _rddHandle = null; + + //meta data on origin of given rdd handle + private boolean _checkpointed = false; //created via checkpoint instruction + private boolean _hdfsfile = false; //created from hdfs file + private String _hdfsFname = null; //hdfs filename, if created from hdfs. + + public RDDObject( JavaPairRDD<?,?> rddvar, String varName) + { + _rddHandle = rddvar; + _varName = varName; + } + + /** + * + * @return + */ + public JavaPairRDD<?,?> getRDD() + { + return _rddHandle; + } + + public void setCheckpointRDD( boolean flag ) + { + _checkpointed = flag; + } + + public boolean isCheckpointRDD() + { + return _checkpointed; + } + + public void setHDFSFile( boolean flag ) { + _hdfsfile = flag; + } + + public void setHDFSFilename( String fname ) { + _hdfsFname = fname; + } + + public boolean isHDFSFile() { + return _hdfsfile; + } + + public String getHDFSFilename() { + return _hdfsFname; + } + + + /** + * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file; + * in both cases, we can directly read the file instead of collecting + * the given rdd. + * + * @return + */ + public boolean allowsShortCircuitRead() + { + boolean ret = isHDFSFile(); + + if( isCheckpointRDD() && getLineageChilds().size() == 1 ) { + LineageObject lo = getLineageChilds().get(0); + ret = ( lo instanceof RDDObject && ((RDDObject)lo).isHDFSFile() ); + } + + return ret; + } + + /** + * + * @return + */ + public boolean allowsShortCircuitCollect() + { + return ( isCheckpointRDD() && getLineageChilds().size() == 1 + && getLineageChilds().get(0) instanceof RDDObject ); + } + + /** + * + * @return + */ + public boolean rHasCheckpointRDDChilds() + { + //probe for checkpoint rdd + if( _checkpointed ) + return true; + + //process childs recursively + boolean ret = false; + for( LineageObject lo : getLineageChilds() ) { + if( lo instanceof RDDObject ) + ret |= ((RDDObject)lo).rHasCheckpointRDDChilds(); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java index e561f3c..6cb0830 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java @@ -1,167 +1,167 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.matrix.mapred; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.sysml.runtime.functionobjects.CM; -import org.apache.sysml.runtime.functionobjects.KahanPlus; -import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; -import org.apache.sysml.runtime.instructions.cp.KahanObject; -import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction; -import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes; -import org.apache.sysml.runtime.matrix.data.WeightedCell; -import org.apache.sysml.runtime.matrix.operators.AggregateOperator; -import org.apache.sysml.runtime.matrix.operators.CMOperator; -import org.apache.sysml.runtime.matrix.operators.Operator; - - -public class GroupedAggMRCombiner extends ReduceBase - implements Reducer<TaggedMatrixIndexes, WeightedCell, TaggedMatrixIndexes, WeightedCell> -{ - //grouped aggregate instructions - private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = new HashMap<Byte, GroupedAggregateInstruction>(); - - //reused intermediate objects - private CM_COV_Object cmObj = new CM_COV_Object(); - private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>(); - private WeightedCell outCell = new WeightedCell(); - - @Override - public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> values, - OutputCollector<TaggedMatrixIndexes, WeightedCell> out, Reporter reporter) - throws IOException - { - long start = System.currentTimeMillis(); - - //get aggregate operator - GroupedAggregateInstruction ins = grpaggInstructions.get(key.getTag()); - Operator op = ins.getOperator(); - boolean isPartialAgg = true; - - //combine iterator to single value - try - { - if(op instanceof CMOperator) //everything except sum - { - if( ((CMOperator) op).isPartialAggregateOperator() ) - { - cmObj.reset(); - CM lcmFn = cmFn.get(key.getTag()); - - //partial aggregate cm operator - while( values.hasNext() ) - { - WeightedCell value=values.next(); - lcmFn.execute(cmObj, value.getValue(), value.getWeight()); - } - - outCell.setValue(cmObj.getRequiredPartialResult(op)); - outCell.setWeight(cmObj.getWeight()); - } - else //forward tuples to reducer - { - isPartialAgg = false; - while( values.hasNext() ) - out.collect(key, values.next()); - } - } - else if(op instanceof AggregateOperator) //sum - { - AggregateOperator aggop=(AggregateOperator) op; - - if( aggop.correctionExists ) - { - KahanObject buffer=new KahanObject(aggop.initialValue, 0); - - KahanPlus.getKahanPlusFnObject(); - - //partial aggregate with correction - while( values.hasNext() ) - { - WeightedCell value=values.next(); - aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight()); - } - - outCell.setValue(buffer._sum); - outCell.setWeight(1); - } - else //no correction - { - double v = aggop.initialValue; - - //partial aggregate without correction - while(values.hasNext()) - { - WeightedCell value=values.next(); - v=aggop.increOp.fn.execute(v, value.getValue()*value.getWeight()); - } - - outCell.setValue(v); - outCell.setWeight(1); - } - } - else - throw new IOException("Unsupported operator in instruction: " + ins); - } - catch(Exception ex) - { - throw new IOException(ex); - } - - //collect the output (to reducer) - if( isPartialAgg ) - out.collect(key, outCell); - - reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, System.currentTimeMillis()-start); - } - - @Override - public void configure(JobConf job) - { - try - { - GroupedAggregateInstruction[] grpaggIns = MRJobConfiguration.getGroupedAggregateInstructions(job); - if( grpaggIns != null ) - for(GroupedAggregateInstruction ins : grpaggIns) - { - grpaggInstructions.put(ins.output, ins); - if( ins.getOperator() instanceof CMOperator ) - cmFn.put(ins.output, CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType())); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @Override - public void close() - { - //do nothing, overrides unnecessary handling in superclass - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.matrix.mapred; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.sysml.runtime.functionobjects.CM; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction; +import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes; +import org.apache.sysml.runtime.matrix.data.WeightedCell; +import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.CMOperator; +import org.apache.sysml.runtime.matrix.operators.Operator; + + +public class GroupedAggMRCombiner extends ReduceBase + implements Reducer<TaggedMatrixIndexes, WeightedCell, TaggedMatrixIndexes, WeightedCell> +{ + //grouped aggregate instructions + private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = new HashMap<Byte, GroupedAggregateInstruction>(); + + //reused intermediate objects + private CM_COV_Object cmObj = new CM_COV_Object(); + private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>(); + private WeightedCell outCell = new WeightedCell(); + + @Override + public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> values, + OutputCollector<TaggedMatrixIndexes, WeightedCell> out, Reporter reporter) + throws IOException + { + long start = System.currentTimeMillis(); + + //get aggregate operator + GroupedAggregateInstruction ins = grpaggInstructions.get(key.getTag()); + Operator op = ins.getOperator(); + boolean isPartialAgg = true; + + //combine iterator to single value + try + { + if(op instanceof CMOperator) //everything except sum + { + if( ((CMOperator) op).isPartialAggregateOperator() ) + { + cmObj.reset(); + CM lcmFn = cmFn.get(key.getTag()); + + //partial aggregate cm operator + while( values.hasNext() ) + { + WeightedCell value=values.next(); + lcmFn.execute(cmObj, value.getValue(), value.getWeight()); + } + + outCell.setValue(cmObj.getRequiredPartialResult(op)); + outCell.setWeight(cmObj.getWeight()); + } + else //forward tuples to reducer + { + isPartialAgg = false; + while( values.hasNext() ) + out.collect(key, values.next()); + } + } + else if(op instanceof AggregateOperator) //sum + { + AggregateOperator aggop=(AggregateOperator) op; + + if( aggop.correctionExists ) + { + KahanObject buffer=new KahanObject(aggop.initialValue, 0); + + KahanPlus.getKahanPlusFnObject(); + + //partial aggregate with correction + while( values.hasNext() ) + { + WeightedCell value=values.next(); + aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight()); + } + + outCell.setValue(buffer._sum); + outCell.setWeight(1); + } + else //no correction + { + double v = aggop.initialValue; + + //partial aggregate without correction + while(values.hasNext()) + { + WeightedCell value=values.next(); + v=aggop.increOp.fn.execute(v, value.getValue()*value.getWeight()); + } + + outCell.setValue(v); + outCell.setWeight(1); + } + } + else + throw new IOException("Unsupported operator in instruction: " + ins); + } + catch(Exception ex) + { + throw new IOException(ex); + } + + //collect the output (to reducer) + if( isPartialAgg ) + out.collect(key, outCell); + + reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, System.currentTimeMillis()-start); + } + + @Override + public void configure(JobConf job) + { + try + { + GroupedAggregateInstruction[] grpaggIns = MRJobConfiguration.getGroupedAggregateInstructions(job); + if( grpaggIns != null ) + for(GroupedAggregateInstruction ins : grpaggIns) + { + grpaggInstructions.put(ins.output, ins); + if( ins.getOperator() instanceof CMOperator ) + cmFn.put(ins.output, CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType())); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @Override + public void close() + { + //do nothing, overrides unnecessary handling in superclass + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java index a08631d..fa9843a 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java @@ -1,84 +1,84 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.matrix.sort; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.WritableComparable; - -@SuppressWarnings("rawtypes") -public class IndexSortComparable implements WritableComparable -{ - - protected DoubleWritable _dval = null; - protected LongWritable _lval = null; - - public IndexSortComparable() - { - _dval = new DoubleWritable(); - _lval = new LongWritable(); - } - - public void set(double dval, long lval) - { - _dval.set(dval); - _lval.set(lval); - } - - @Override - public void readFields(DataInput arg0) - throws IOException - { - _dval.readFields(arg0); - _lval.readFields(arg0); - } - - @Override - public void write(DataOutput arg0) - throws IOException - { - _dval.write(arg0); - _lval.write(arg0); - } - - @Override - public int compareTo(Object o) - { - //compare only double value (e.g., for partitioner) - if( o instanceof DoubleWritable ) { - return _dval.compareTo((DoubleWritable) o); - } - //compare double value and index (e.g., for stable sort) - else if( o instanceof IndexSortComparable) { - IndexSortComparable that = (IndexSortComparable)o; - int tmp = _dval.compareTo(that._dval); - if( tmp==0 ) //secondary sort - tmp = _lval.compareTo(that._lval); - return tmp; - } - else { - throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName()); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.matrix.sort; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; + +@SuppressWarnings("rawtypes") +public class IndexSortComparable implements WritableComparable +{ + + protected DoubleWritable _dval = null; + protected LongWritable _lval = null; + + public IndexSortComparable() + { + _dval = new DoubleWritable(); + _lval = new LongWritable(); + } + + public void set(double dval, long lval) + { + _dval.set(dval); + _lval.set(lval); + } + + @Override + public void readFields(DataInput arg0) + throws IOException + { + _dval.readFields(arg0); + _lval.readFields(arg0); + } + + @Override + public void write(DataOutput arg0) + throws IOException + { + _dval.write(arg0); + _lval.write(arg0); + } + + @Override + public int compareTo(Object o) + { + //compare only double value (e.g., for partitioner) + if( o instanceof DoubleWritable ) { + return _dval.compareTo((DoubleWritable) o); + } + //compare double value and index (e.g., for stable sort) + else if( o instanceof IndexSortComparable) { + IndexSortComparable that = (IndexSortComparable)o; + int tmp = _dval.compareTo(that._dval); + if( tmp==0 ) //secondary sort + tmp = _lval.compareTo(that._lval); + return tmp; + } + else { + throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName()); + } + } +}
