This is an automated email from the ASF dual-hosted git repository.

joddiyzhang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/singa.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8b52cd7  Large dataset CNN
     new 980ea11  Merge pull request #827 from chrishkchris/large_dataset
8b52cd7 is described below

commit 8b52cd79f4de8f02d357231fc8d01677eb517c5c
Author: Chris Yeung <[email protected]>
AuthorDate: Thu Jan 14 15:43:28 2021 +0800

    Large dataset CNN
---
 examples/largedataset_cnn/README.md          |  33 +++
 examples/largedataset_cnn/process_data.py    |  82 ++++++++
 examples/largedataset_cnn/train_largedata.py | 301 +++++++++++++++++++++++++++
 examples/largedataset_cnn/train_mpi.py       |  83 ++++++++
 4 files changed, 499 insertions(+)

diff --git a/examples/largedataset_cnn/README.md 
b/examples/largedataset_cnn/README.md
new file mode 100644
index 0000000..7d71279
--- /dev/null
+++ b/examples/largedataset_cnn/README.md
@@ -0,0 +1,33 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Image Classification using Convolutional Neural Networks with a dataset from 
the filesysetm
+
+Examples inside this folder show how to train CNN models using SINGA for image 
classification.
+
+It read the dataset from the filesystem defined by `process_data.py`. Hence, 
users can modify `process_data.py`
+for their perference of dataset format.
+
+Before running the code, the `model` folder in `examples/cnn` should be copied 
to this directory.
+
+* `train_largedata.py` is the training script, which controls the training 
flow by
+  doing BackPropagation and SGD update.
+
+* `train_mpi.py` is the script for distributed training (among multiple nodes) 
+  using MPI and NCCL for communication.
diff --git a/examples/largedataset_cnn/process_data.py 
b/examples/largedataset_cnn/process_data.py
new file mode 100644
index 0000000..0dd3c03
--- /dev/null
+++ b/examples/largedataset_cnn/process_data.py
@@ -0,0 +1,82 @@
+import os
+import imghdr
+import pickle as pkl
+import numpy as np
+from PIL import Image
+
+def paths_to_images(paths, image_size):
+    num_images=len(paths)
+    im = np.zeros((num_images,3,image_size,image_size), dtype=np.float32)
+
+    for i in range(num_images):
+        temp = 
np.array(Image.open(paths[i]).convert('RGB').resize((image_size, image_size), 
Image.BILINEAR))
+        temp = np.moveaxis(temp,-1,0)
+        im[i] = temp  
+
+    im /= 255
+
+    return im
+
+
+def process_data(dataset_root, classes):
+  # load class names
+    with open(classes, 'r', encoding='utf-8') as f:
+        classes = f.readlines()
+        classes = list(map(lambda x: x.strip(), classes))
+        num_classes = len(classes)
+
+    # make input_paths and labels
+    input_paths, labels = [], []
+    for class_name in os.listdir(dataset_root):
+        class_root = os.path.join(dataset_root, class_name)
+        class_id = classes.index(class_name)
+        for path in os.listdir(class_root):
+            path = os.path.join(class_root, path)
+            if imghdr.what(path) is None:
+                # this is not an image file
+                continue
+            input_paths.append(path)
+            labels.append(class_id)
+
+    # convert to numpy array
+    input_paths = np.array(input_paths)
+    labels = np.array(labels, dtype=np.int32)
+
+    # shuffle dataset
+    np.random.seed(0)
+    perm = np.random.permutation(len(input_paths))
+    input_paths = input_paths[perm]
+    labels = labels[perm]
+
+    # split dataset for training and validation
+    border = int(len(input_paths) * 0.8)
+    train_labels = labels[:border]
+    val_labels = labels[border:]
+    train_input_paths = input_paths[:border]
+    val_input_paths = input_paths[border:]
+    
+
+    print("Training on %d images and labels" % (len(train_input_paths)))
+    print("Validation on %d images and labels" % (len(val_input_paths)))
+    
+    return train_input_paths, train_labels, val_input_paths, val_labels
+
+def loaddata():
+    dataset_root = '/Dataset/Data/'
+    classes = '/Dataset/classes.txt'    
+    return process_data(dataset_root, classes)
+
+if __name__ == '__main__':
+
+    # test script in main
+    train_input_paths, train_labels, val_input_paths, val_labels = loaddata()
+
+    print(train_input_paths.shape)
+    print(train_labels.shape)
+    print(val_input_paths.shape)
+    print(val_labels.shape)   
+
+    a=paths_to_images(paths=train_input_paths[0:5], image_size=299)
+    print(a)
+    print(a.shape)
+    
\ No newline at end of file
diff --git a/examples/largedataset_cnn/train_largedata.py 
b/examples/largedataset_cnn/train_largedata.py
new file mode 100644
index 0000000..dad4670
--- /dev/null
+++ b/examples/largedataset_cnn/train_largedata.py
@@ -0,0 +1,301 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from singa import singa_wrap as singa
+from singa import device
+from singa import tensor
+from singa import opt
+import numpy as np
+import time
+import argparse
+from PIL import Image
+import process_data
+
+# Data Augmentation
+def augmentation(x, batch_size):
+    xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric')
+    for data_num in range(0, batch_size):
+        offset = np.random.randint(8, size=2)
+        x[data_num, :, :, :] = xpad[data_num, :,
+                                    offset[0]:offset[0] + x.shape[2],
+                                    offset[1]:offset[1] + x.shape[2]]
+        if_flip = np.random.randint(2)
+        if (if_flip):
+            x[data_num, :, :, :] = x[data_num, :, :, ::-1]
+    return x
+
+
+# Calculate Accuracy
+def accuracy(pred, target):
+    # y is network output to be compared with ground truth (int)
+    y = np.argmax(pred, axis=1)
+    a = y == target
+    correct = np.array(a, "int").sum()
+    # print(correct)
+    return correct
+
+
+# Data partition according to the rank
+def partition(global_rank, world_size, train_x, train_y, val_x, val_y):
+    # Partition training data
+    data_per_rank = train_x.shape[0] // world_size
+    idx_start = global_rank * data_per_rank
+    idx_end = (global_rank + 1) * data_per_rank
+    train_x = train_x[idx_start:idx_end]
+    train_y = train_y[idx_start:idx_end]
+    # Partition evaluation data
+    data_per_rank = val_x.shape[0] // world_size
+    idx_start = global_rank * data_per_rank
+    idx_end = (global_rank + 1) * data_per_rank
+    val_x = val_x[idx_start:idx_end]
+    val_y = val_y[idx_start:idx_end]
+    return train_x, train_y, val_x, val_y
+
+
+# Function to all reduce NUMPY Accuracy and Loss from Multiple Devices
+def reduce_variable(variable, dist_opt, reducer):
+    reducer.copy_from_numpy(variable)
+    dist_opt.all_reduce(reducer.data)
+    dist_opt.wait()
+    output = tensor.to_numpy(reducer)
+    return output
+
+
+def resize_dataset(x, image_size):
+    num_data = x.shape[0]
+    dim = x.shape[1]
+    X = np.zeros(shape=(num_data, dim, image_size, image_size),
+                 dtype=np.float32)
+    for n in range(0, num_data):
+        for d in range(0, dim):
+            X[n, d, :, :] = np.array(Image.fromarray(x[n, d, :, :]).resize(
+                (image_size, image_size), Image.BILINEAR),
+                                     dtype=np.float32)
+    return X
+
+
+def run(global_rank,
+        world_size,
+        local_rank,
+        max_epoch,
+        batch_size,
+        model,
+        data,
+        sgd,
+        graph,
+        verbosity,
+        dist_option='fp32',
+        spars=None):
+    dev = device.create_cuda_gpu_on(local_rank)
+    dev.SetRandSeed(0)
+    np.random.seed(0)
+
+
+    train_x, train_y, val_x, val_y = process_data.loadfood172()
+
+    num_channels = 3
+    num_classes = (np.max(train_y) + 1).item()
+    print(num_classes)
+
+    if model == 'resnet':
+        from model import resnet
+        model = resnet.resnet50(num_channels=num_channels,
+                                num_classes=num_classes)
+    elif model == 'xceptionnet':
+        from model import xceptionnet
+        model = xceptionnet.create_model(num_channels=num_channels,
+                                         num_classes=num_classes)
+    elif model == 'cnn':
+        from model import cnn
+        model = cnn.create_model(num_channels=num_channels,
+                                 num_classes=num_classes)
+    elif model == 'alexnet':
+        from model import alexnet
+        model = alexnet.create_model(num_channels=num_channels,
+                                     num_classes=num_classes)
+
+    # For distributed training, sequential gives better performance
+    if hasattr(sgd, "communicator"):
+        DIST = True
+        sequential = True
+    else:
+        DIST = False
+        sequential = False
+
+    if DIST:
+        train_x, train_y, val_x, val_y = partition(global_rank, world_size,
+                                                   train_x, train_y, val_x,
+                                                   val_y)
+    '''
+    # check dataset shape correctness
+    if global_rank == 0:
+        print("Check the shape of dataset:")
+        print(train_x.shape)
+        print(train_y.shape)
+    '''
+
+    if model.dimension == 4:
+        tx = tensor.Tensor(
+            (batch_size, num_channels, model.input_size, model.input_size), 
dev,
+            tensor.float32)
+    elif model.dimension == 2:
+        tx = tensor.Tensor((batch_size, data_size), dev, tensor.float32)
+        np.reshape(train_x, (train_x.shape[0], -1))
+        np.reshape(val_x, (val_x.shape[0], -1))
+
+    ty = tensor.Tensor((batch_size,), dev, tensor.int32)
+    num_train_batch = train_x.shape[0] // batch_size
+    num_val_batch = val_x.shape[0] // batch_size
+    idx = np.arange(train_x.shape[0], dtype=np.int32)
+
+    # attached model to graph
+    model.on_device(dev)
+    model.set_optimizer(sgd)
+    model.compile([tx], is_train=True, use_graph=graph, sequential=sequential)
+    dev.SetVerbosity(verbosity)
+
+    checkpointpath="checkpoint.zip"
+
+    import os
+    if os.path.exists(checkpointpath):
+        model.load_states(fpath=checkpointpath)
+
+    # Training and Evaluation Loop
+    for epoch in range(max_epoch):
+        start_time = time.time()
+        np.random.shuffle(idx)
+
+        if global_rank == 0:
+            print('Starting Epoch %d:' % (epoch))
+
+        # Training Phase
+        train_correct = np.zeros(shape=[1], dtype=np.float32)
+        test_correct = np.zeros(shape=[1], dtype=np.float32)
+        train_loss = np.zeros(shape=[1], dtype=np.float32)
+
+        model.train()
+        for b in range(num_train_batch):
+            # Generate the patch data in this iteration
+            x = train_x[idx[b * batch_size:(b + 1) * batch_size]]
+            x = process_data.paths_to_images(x,model.input_size)
+            if model.dimension == 4:
+                x = augmentation(x, batch_size)
+            y = train_y[idx[b * batch_size:(b + 1) * batch_size]]
+
+            # Copy the patch data into input tensors
+            tx.copy_from_numpy(x)
+            ty.copy_from_numpy(y)
+
+            # Train the model
+            out, loss = model(tx, ty, dist_option, spars)
+            train_correct += accuracy(tensor.to_numpy(out), y)
+            train_loss += tensor.to_numpy(loss)[0]
+
+        if DIST:
+            # Reduce the Evaluation Accuracy and Loss from Multiple Devices
+            reducer = tensor.Tensor((1,), dev, tensor.float32)
+            train_correct = reduce_variable(train_correct, sgd, reducer)
+            train_loss = reduce_variable(train_loss, sgd, reducer)
+
+        if global_rank == 0:
+            print('Training loss = %f, training accuracy = %f' %
+                  (train_loss, train_correct /
+                   (num_train_batch * batch_size * world_size)),
+                  flush=True)
+
+        # Evaluation Phase
+        model.eval()
+        for b in range(num_val_batch):
+            x = val_x[b * batch_size:(b + 1) * batch_size]
+            x = process_data.paths_to_images(x,model.input_size)
+            y = val_y[b * batch_size:(b + 1) * batch_size]
+            tx.copy_from_numpy(x)
+            ty.copy_from_numpy(y)
+            out_test = model(tx)
+            test_correct += accuracy(tensor.to_numpy(out_test), y)
+
+        if DIST:
+            # Reduce the Evaulation Accuracy from Multiple Devices
+            test_correct = reduce_variable(test_correct, sgd, reducer)
+
+        # Output the Evaluation Accuracy
+        if global_rank == 0:
+            print('Evaluation accuracy = %f, Elapsed Time = %fs' %
+                  (test_correct / (num_val_batch * batch_size * world_size),
+                   time.time() - start_time),
+                  flush=True)
+
+    dev.PrintTimeProfiling()
+
+    if global_rank == 0:
+        if os.path.exists(checkpointpath):
+            os.remove(checkpointpath)
+        model.save_states(checkpointpath)
+
+
+if __name__ == '__main__':
+    # use argparse to get command config: max_epoch, model, data, etc. for 
single gpu training
+    parser = argparse.ArgumentParser(
+        description='Training using the autograd and graph.')
+    parser.add_argument('model',
+                        choices=['resnet', 'xceptionnet', 'cnn', 'alexnet'],
+                        default='cnn')
+    parser.add_argument('--epoch',
+                        '--max-epoch',
+                        default=10,
+                        type=int,
+                        help='maximum epochs',
+                        dest='max_epoch')
+    parser.add_argument('--bs',
+                        '--batch-size',
+                        default=64,
+                        type=int,
+                        help='batch size',
+                        dest='batch_size')
+    parser.add_argument('--lr',
+                        '--learning-rate',
+                        default=0.005,
+                        type=float,
+                        help='initial learning rate',
+                        dest='lr')
+    # determine which gpu to use
+    parser.add_argument('--id',
+                        '--device-id',
+                        default=0,
+                        type=int,
+                        help='which GPU to use',
+                        dest='device_id')
+    parser.add_argument('--no-graph',
+                        '--disable-graph',
+                        default='True',
+                        action='store_false',
+                        help='disable graph',
+                        dest='graph')
+    parser.add_argument('--verbosity',
+                        '--log-verbosity',
+                        default=0,
+                        type=int,
+                        help='logging verbosity',
+                        dest='verbosity')
+
+    args = parser.parse_args()
+
+    sgd = opt.SGD(lr=args.lr, momentum=0.9, weight_decay=1e-5)
+    run(0, 1, args.device_id, args.max_epoch, args.batch_size, args.model,
+        "no", sgd, args.graph, args.verbosity)
diff --git a/examples/largedataset_cnn/train_mpi.py 
b/examples/largedataset_cnn/train_mpi.py
new file mode 100644
index 0000000..28d260b
--- /dev/null
+++ b/examples/largedataset_cnn/train_mpi.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from singa import singa_wrap as singa
+from singa import opt
+import argparse
+import train_largedata
+
+if __name__ == '__main__':
+    # use argparse to get command config: max_epoch, model, data, etc. for 
single gpu training
+    parser = argparse.ArgumentParser(
+        description='Training using the autograd and graph.')
+    parser.add_argument('model',
+                        choices=['resnet', 'xceptionnet', 'cnn', 'mlp'],
+                        default='cnn')
+    parser.add_argument('--epoch',
+                        '--max-epoch',
+                        default=10,
+                        type=int,
+                        help='maximum epochs',
+                        dest='max_epoch')
+    parser.add_argument('--bs',
+                        '--batch-size',
+                        default=64,
+                        type=int,
+                        help='batch size',
+                        dest='batch_size')
+    parser.add_argument('--lr',
+                        '--learning-rate',
+                        default=0.005,
+                        type=float,
+                        help='initial learning rate',
+                        dest='lr')
+    parser.add_argument('--op',
+                        '--option',
+                        default='fp32',
+                        
choices=['fp32','fp16','partialUpdate','sparseTopK','sparseThreshold'],
+                        help='distibuted training options',
+                        dest='dist_option')  # currently partialUpdate support 
graph=False only
+    parser.add_argument('--spars',
+                        '--sparsification',
+                        default='0.05',
+                        type=float,
+                        help='the sparsity parameter used for sparsification, 
between 0 to 1',
+                        dest='spars')
+    parser.add_argument('--no-graph',
+                        '--disable-graph',
+                        default='True',
+                        action='store_false',
+                        help='disable graph',
+                        dest='graph')
+    parser.add_argument('--verbosity',
+                        '--log-verbosity',
+                        default=0,
+                        type=int,
+                        help='logging verbosity',
+                        dest='verbosity')
+
+    args = parser.parse_args()
+
+    sgd = opt.SGD(lr=args.lr, momentum=0.9, weight_decay=1e-5)
+    sgd = opt.DistOpt(sgd)
+
+    train_largedata.run(sgd.global_rank, sgd.world_size, sgd.local_rank, 
args.max_epoch,
+              args.batch_size, args.model, "no", sgd, args.graph,
+              args.verbosity, args.dist_option, args.spars)

Reply via email to