This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 85e066d  SUBMARINE-336. Add a Pytorch example in mini-submarine
85e066d is described below

commit 85e066dd2c5d659ba66b70703225f6cd1c99e266
Author: Ryan Lo <[email protected]>
AuthorDate: Tue Jan 14 19:59:42 2020 +0800

    SUBMARINE-336. Add a Pytorch example in mini-submarine
    
    ### What is this PR for?
    Add a Pytorch example in mini-submarine
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    
    ### What is the Jira issue?
    
[SUBMARINE-336](https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-336)
    
    ### How should this be tested?
    [passed CI](https://travis-ci.org/lowc1012/submarine/builds/636368078)
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Ryan Lo <[email protected]>
    
    Closes #150 from lowc1012/SUBMARINE-336 and squashes the following commits:
    
    e5a75c3 [Ryan Lo] SUBMARINE-336. Update run_submarine_pytorch_mnist_tony.sh
    df4923e [Ryan Lo] SUBMARINE-336. Add a Pytorch example in mini-submarine
---
 .../submarine/build_python_virtual_env.sh          |   2 +
 .../submarine/pytorch_mnist_distributed.py         | 242 +++++++++++++++++++++
 .../submarine/run_submarine_pytorch_mnist_tony.sh  |  58 +++++
 3 files changed, 302 insertions(+)

diff --git a/dev-support/mini-submarine/submarine/build_python_virtual_env.sh 
b/dev-support/mini-submarine/submarine/build_python_virtual_env.sh
index 4ecfc01..6ecd1c8 100755
--- a/dev-support/mini-submarine/submarine/build_python_virtual_env.sh
+++ b/dev-support/mini-submarine/submarine/build_python_virtual_env.sh
@@ -21,6 +21,8 @@ tar xf virtualenv-16.0.0.tar.gz
 python3 virtualenv-16.0.0/virtualenv.py venv
 . venv/bin/activate
 pip3 install tensorflow==1.13.1
+pip3 install torch==0.4.1
+pip3 install torchvision==0.1.8
 pip3 install /opt/pysubmarine/.
 zip -r myvenv.zip venv
 deactivate
diff --git a/dev-support/mini-submarine/submarine/pytorch_mnist_distributed.py 
b/dev-support/mini-submarine/submarine/pytorch_mnist_distributed.py
new file mode 100644
index 0000000..667603a
--- /dev/null
+++ b/dev-support/mini-submarine/submarine/pytorch_mnist_distributed.py
@@ -0,0 +1,242 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
==============================================================================
+
+"""A deep MNIST classifier using convolutional layers.
+
+This example was adapted from
+https://pytorch.org/docs/master/distributed.html
+https://pytorch.org/tutorials/intermediate/dist_tuto.html
+https://github.com/narumiruna/pytorch-distributed-example/blob/master/mnist/main.py
+
+Each worker reads the full MNIST dataset and asynchronously trains a CNN with 
dropout and using the Adam optimizer,
+updating the model parameters on shared parameter servers.
+
+The current training accuracy is printed out after every 100 steps.
+"""
+
+from __future__ import division, print_function
+
+import argparse
+
+import os
+import torch
+import torch.nn.functional as F
+from torch import distributed, nn
+from torch.utils import data
+from torch.utils.data.distributed import DistributedSampler
+from torchvision import datasets, transforms
+
+
+class AverageMeter(object):
+
+    def __init__(self):
+        self.sum = 0
+        self.count = 0
+
+    def update(self, value, number):
+        self.sum += value * number
+        self.count += number
+
+    @property
+    def average(self):
+        return self.sum / self.count
+
+
+class AccuracyMeter(object):
+
+    def __init__(self):
+        self.correct = 0
+        self.count = 0
+
+    def update(self, output, label):
+        predictions = output.data.argmax(dim=1)
+        correct = predictions.eq(label.data).sum().item()
+
+        self.correct += correct
+        self.count += output.size(0)
+
+    @property
+    def accuracy(self):
+        return self.correct / self.count
+
+
+class Trainer(object):
+
+    def __init__(self, net, optimizer, train_loader, test_loader, device):
+        self.net = net
+        self.optimizer = optimizer
+        self.train_loader = train_loader
+        self.test_loader = test_loader
+        self.device = device
+
+    def train(self):
+        train_loss = AverageMeter()
+        train_acc = AccuracyMeter()
+
+        self.net.train()
+
+        for data, label in self.train_loader:
+            data = data.to(self.device)
+            label = label.to(self.device)
+
+            output = self.net(data)
+            loss = F.cross_entropy(output, label)
+
+            self.optimizer.zero_grad()
+            loss.backward()
+            # average the gradients
+            self.average_gradients()
+            self.optimizer.step()
+
+            train_loss.update(loss.item(), data.size(0))
+            train_acc.update(output, label)
+
+        return train_loss.average, train_acc.accuracy
+
+    def evaluate(self):
+        test_loss = AverageMeter()
+        test_acc = AccuracyMeter()
+
+        self.net.eval()
+
+        with torch.no_grad():
+            for data, label in self.test_loader:
+                data = data.to(self.device)
+                label = label.to(self.device)
+
+                output = self.net(data)
+                loss = F.cross_entropy(output, label)
+
+                test_loss.update(loss.item(), data.size(0))
+                test_acc.update(output, label)
+
+        return test_loss.average, test_acc.accuracy
+
+    def average_gradients(self):
+        world_size = distributed.get_world_size()
+
+        for p in self.net.parameters():
+            group = distributed.new_group(ranks=list(range(world_size)))
+
+            tensor = p.grad.data.cpu()
+
+            distributed.all_reduce(
+                tensor, op=distributed.reduce_op.SUM, group=group)
+
+            tensor /= float(world_size)
+
+            p.grad.data = tensor.to(self.device)
+
+
+class Net(nn.Module):
+
+    def __init__(self):
+        super(Net, self).__init__()
+        self.fc = nn.Linear(784, 10)
+
+    def forward(self, x):
+        return self.fc(x.view(x.size(0), -1))
+
+
+def get_dataloader(root, batch_size):
+    transform = transforms.Compose([
+        transforms.ToTensor(),
+        transforms.Normalize((0.13066047740239478,), (0.3081078087569972,))
+    ])
+
+    train_set = datasets.MNIST(
+        root, train=True, transform=transform, download=True)
+    sampler = DistributedSampler(train_set)
+
+    train_loader = data.DataLoader(
+        train_set,
+        batch_size=batch_size,
+        shuffle=(sampler is None),
+        sampler=sampler)
+
+    test_loader = data.DataLoader(
+        datasets.MNIST(root, train=False, transform=transform, download=True),
+        batch_size=batch_size,
+        shuffle=False)
+
+    return train_loader, test_loader
+
+
+def solve(args):
+    device = torch.device('cuda' if args.cuda else 'cpu')
+
+    net = Net().to(device)
+
+    optimizer = torch.optim.Adam(net.parameters(), lr=args.learning_rate)
+
+    train_loader, test_loader = get_dataloader(args.root, args.batch_size)
+
+    trainer = Trainer(net, optimizer, train_loader, test_loader, device)
+
+    for epoch in range(1, args.epochs + 1):
+        train_loss, train_acc = trainer.train()
+        test_loss, test_acc = trainer.evaluate()
+
+        print(
+            'Epoch: {}/{},'.format(epoch, args.epochs),
+            'train loss: {:.6f}, train acc: {:.6f}, test loss: {:.6f}, test 
acc: {:.6f}.'.
+                format(train_loss, train_acc, test_loss, test_acc))
+
+
+def init_process(args):
+    distributed.init_process_group(
+        backend=args.backend,
+        init_method=args.init_method,
+        rank=args.rank,
+        world_size=args.world_size)
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--backend',
+        type=str,
+        default='tcp',
+        help='Name of the backend to use.')
+    parser.add_argument(
+        '--init-method',
+        '-i',
+        type=str,
+        default=os.environ.get('INIT_METHOD', 'tcp://127.0.0.1:23456'),
+        help='URL specifying how to initialize the package.')
+    parser.add_argument(
+        '--rank', '-r',
+        type=int,
+        default=int(os.environ.get('RANK')),
+        help='Rank of the current process.')
+    parser.add_argument(
+        '--world-size',
+        '-s',
+        type=int,
+        default=int(os.environ.get('WORLD')),
+        help='Number of processes participating in the job.')
+    parser.add_argument('--epochs', type=int, default=20)
+    parser.add_argument('--no-cuda', action='store_true')
+    parser.add_argument('--learning-rate', '-lr', type=float, default=1e-3)
+    parser.add_argument('--root', type=str, default='data')
+    parser.add_argument('--batch-size', type=int, default=128)
+    args = parser.parse_args()
+    args.cuda = torch.cuda.is_available() and not args.no_cuda
+    print(args)
+
+    init_process(args)
+    solve(args)
+
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
diff --git 
a/dev-support/mini-submarine/submarine/run_submarine_pytorch_mnist_tony.sh 
b/dev-support/mini-submarine/submarine/run_submarine_pytorch_mnist_tony.sh
new file mode 100755
index 0000000..55dc236
--- /dev/null
+++ b/dev-support/mini-submarine/submarine/run_submarine_pytorch_mnist_tony.sh
@@ -0,0 +1,58 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+while [ $# -gt 0 ]; do
+  case "$1" in
+    --debug*)
+      DEBUG=$1
+      if [ -n "$2" ]; then
+        DEBUG_PORT=$2
+        shift
+      fi
+      shift
+      ;;
+    *)
+      break
+      ;;
+  esac
+done
+
+if [ "$DEBUG" ]; then
+  if [ -z "$DEBUG_PORT" ]; then
+    DEBUG_PORT=8000
+  fi
+  JAVA_CMD="java 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=${DEBUG_PORT}"
+else
+  JAVA_CMD="java"
+fi
+
+SUBMARINE_VERSION=0.3.0-SNAPSHOT
+HADOOP_VERSION=2.9
+SUBMARINE_PATH=/opt/submarine-current
+HADOOP_CONF_PATH=/usr/local/hadoop/etc/hadoop
+MNIST_PATH=/home/yarn/submarine
+
+${JAVA_CMD} -cp 
${SUBMARINE_PATH}/submarine-all-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}.jar:${HADOOP_CONF_PATH}
 \
+ org.apache.submarine.client.cli.Cli job run \
+ --name pytorch-job-001 \
+ --framework pytorch \
+ --input_path "" \
+ --num_workers 2 \
+ --worker_resources memory=1G,vcores=1 \
+ --worker_launch_cmd "myvenv.zip/venv/bin/python pytorch_mnist_distributed.py" 
\
+ --insecure \
+ --verbose \
+ --conf 
tony.containers.resources=${MNIST_PATH}/myvenv.zip#archive,${MNIST_PATH}/pytorch_mnist_distributed.py,${SUBMARINE_PATH}/submarine-all-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to