diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ee616e1674..d361ab87af 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -551,6 +551,20 @@ commitlog_segment_size: 32MiB
 #     parameters:
 #         -
 
+# JNA library is used to operate on commmitlog file
+#use_jna_for_commitlog_io: false
+
+# Enable Direct I/O feature through JNA library. O_DIRECT and O_DSYNC flags are used for this.
+# use_jna_for_commitlog_io should be set to true to enable this feature.
+#use_direct_io_for_commitlog: false
+
+# Direct I/O feature succeeds in flushing with right alignement. Change here if not same.
+#direct_io_minimum_block_alignment: 512
+
+# Direct I/O feature allows high read/write throughput to be obtaind with preferred block size.
+# Use this to configure the same based on the disk type.
+#nvme_disk_block_size: 32MiB
+
 # Compression to apply to SSTables as they flush for compressed tables.
 # Note that tables without compression enabled do not respect this flag.
 #
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 839e4c2c7e..76e485a134 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -114,6 +114,8 @@ public class Config
 
     public ParameterizedClass seed_provider;
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
+    public boolean use_jna_for_commitlog_io = false;
+    public boolean use_direct_io_for_commitlog = false;
 
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
     public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
@@ -505,6 +507,15 @@ public class Config
 
     public DiskOptimizationStrategy disk_optimization_strategy = DiskOptimizationStrategy.ssd;
 
+    /*
+     * Direct-IO feature provides high read/write bandwidth with optimum block size.
+     * Configure this based on disk type.
+     */
+    public DataStorageSpec.IntMebibytesBound nvme_disk_block_size = new DataStorageSpec.IntMebibytesBound("32MiB");
+
+    // Some NVME disk requires File Read/Write input pointer to be aligned for Direct-IO feature.
+    public int direct_io_minimum_block_alignment = 512;
+
     public double disk_optimization_estimate_percentile = 0.95;
 
     public double disk_optimization_page_cross_chance = 0.1;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 33b229e106..bec934e662 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2984,6 +2984,18 @@ public class DatabaseDescriptor
         return conf.commitlog_sync;
     }
 
+    public static int getNVMEDiskBlockSize()
+    {
+        return conf.nvme_disk_block_size.toBytes();
+    }
+
+    public static void setNVMEDiskBlockSize(int sizeMebibytes)
+    {
+	// Reduce maximum size to commitlog segment size.
+	int maxBytes = getCommitLogSegmentSize() < sizeMebibytes ? getCommitLogSegmentSize() : sizeMebibytes;
+        conf.nvme_disk_block_size = new DataStorageSpec.IntMebibytesBound(maxBytes);
+    }
+
     public static void setCommitLogSync(CommitLogSync sync)
     {
         conf.commitlog_sync = sync;
@@ -3013,6 +3025,29 @@ public class DatabaseDescriptor
         indexAccessMode = mode;
     }
 
+    public static boolean getUseJNAForCommitlogIO()
+    {
+        return conf.use_jna_for_commitlog_io;
+    }
+
+    public static void setUseJNAForCommitlogIO(boolean flag)
+    {
+            conf.use_jna_for_commitlog_io = flag;
+    }
+
+    public static boolean getUseDirectIOForCommitlog()
+    {
+        return conf.use_direct_io_for_commitlog;
+    }
+
+    public static void setUseDirectIOForCommitlog(boolean flag)
+    {
+        if (flag && !conf.use_jna_for_commitlog_io)
+            logger.info("Set use_jna_for_commitlog_io to true in Cassandra.yaml file to enable Direct I/O feature for CommitLog files.");
+        else
+            conf.use_direct_io_for_commitlog = flag;
+    }
+
     public static void setDiskFailurePolicy(Config.DiskFailurePolicy policy)
     {
         conf.disk_failure_policy = policy;
@@ -3341,6 +3376,16 @@ public class DatabaseDescriptor
         return diskOptimizationStrategy;
     }
 
+    public static int getDirectIOMinimumBlockAlignment()
+    {
+        return conf.direct_io_minimum_block_alignment;
+    }
+
+    public static void setDirectIOMinimumBlockAlignment(int blockAlignedTo)
+    {
+        conf.direct_io_minimum_block_alignment = blockAlignedTo;
+    }
+
     public static double getDiskOptimizationEstimatePercentile()
     {
         return conf.disk_optimization_estimate_percentile;
@@ -3678,6 +3723,11 @@ public class DatabaseDescriptor
         return conf.scripted_user_defined_functions_enabled;
     }
 
+    public static void enableScriptedUserDefinedFunctions(boolean enableScriptedUserDefinedFunctions)
+    {
+        conf.scripted_user_defined_functions_enabled = enableScriptedUserDefinedFunctions;
+    }
+
     public static boolean enableUserDefinedFunctionsThreads()
     {
         return conf.user_defined_functions_threads_enabled;
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index e6cc2fa814..a80c5e91ef 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -119,7 +119,6 @@ public abstract class AbstractCommitLogSegmentManager
                                                      DatabaseDescriptor.getCommitLogSegmentSize(),
                                                      bufferType);
 
-
         AllocatorRunnable allocator = new AllocatorRunnable();
         executor = executorFactory().infiniteLoop("COMMIT-LOG-ALLOCATOR", allocator, SAFE, NON_DAEMON, SYNCHRONIZED);
         // for simplicity, ensure the first segment is allocated before continuing
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 97a032a8b0..9e50286cf1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -136,9 +136,25 @@ public abstract class CommitLogSegment
     static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
         Configuration config = commitLog.configuration;
-        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager)
-                                                          : config.useCompression() ? new CompressedSegment(commitLog, manager)
-                                                                                    : new MemoryMappedSegment(commitLog, manager);
+        CommitLogSegment segment = null;
+
+        String outmsg = new String("Using ");
+        if (DatabaseDescriptor.getUseJNAForCommitlogIO())
+        {
+            outmsg += " JNA";
+            segment = DatabaseDescriptor.getUseDirectIOForCommitlog() ? new DirectIOSegment(commitLog, manager)
+                                                                      : new NonDirectIOSegment(commitLog, manager);
+            outmsg += DatabaseDescriptor.getUseDirectIOForCommitlog() ? " with Direct-IO" : " ";
+        }
+	else
+        {
+            outmsg += " default ";
+            segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager)
+                                             : config.useCompression() ? new CompressedSegment(commitLog, manager)
+                                                                       : new MemoryMappedSegment(commitLog, manager);
+        }
+        // Debug message test purpose.
+        manager.logger.debug("outmsg={}, segment={}",outmsg, segment);
         segment.writeLogHeader();
         return segment;
     }
@@ -175,8 +191,16 @@ public abstract class CommitLogSegment
 
         try
         {
-            channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
-            fd = NativeLibrary.getfd(channel);
+            if (DatabaseDescriptor.getUseJNAForCommitlogIO()) {
+                channel = null;
+                boolean useDirectIO = DatabaseDescriptor.getUseDirectIOForCommitlog() ? true :false;
+                fd = NativeLibrary.tryCreateFile(logFile.path(), useDirectIO);
+            }
+            else
+	    {
+                channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
+                fd = NativeLibrary.getfd(channel);
+            }
         }
         catch (IOException e)
         {
@@ -194,6 +218,8 @@ public abstract class CommitLogSegment
         CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
         endOfBuffer = buffer.capacity();
 
+        int logHeader = buffer.position();
+        flush(0, logHeader);
         lastSyncedOffset = lastMarkerOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
         headerWritten = true;
@@ -374,7 +400,7 @@ public abstract class CommitLogSegment
             {
                 flush(startMarker, sectionEnd);
             }
-            
+
             if (cdcState == CDCState.CONTAINS)
                 writeCDCIndexFile(descriptor, sectionEnd, close);
             lastSyncedOffset = lastMarkerOffset = nextMarker;
@@ -536,7 +562,8 @@ public abstract class CommitLogSegment
     {
         try
         {
-            channel.close();
+            if (!DatabaseDescriptor.getUseJNAForCommitlogIO())
+                channel.close();
             buffer = null;
         }
         catch (IOException e)
diff --git a/src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java b/src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java
new file mode 100644
index 0000000000..469d64e031
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteOrder;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.NativeLibrary;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import com.sun.jna.Native;
+import com.sun.jna.Pointer;
+
+/* Direct-IO feature works only with minimum block size and right alignement.
+ *  1. Minimum block size >= 512
+ *  2. Input pointer to write to be aligned to block size
+ *
+ * Direct-IO Implementation summary
+ * 1. File is opened through JNA with O_DIRECT|O_DYSNC flags.
+ * 2. ByteBuffer is created using allocateDirect with non-file backed.
+ * 3. Convert ByteBuffer to JNA Pointer which is required during JNA write call.
+ * 4. Alignement is necessary to succeed in write. Both pointer and block size must
+      be aligned.
+ * 5. Call JNA write with (fd, JNA Pointer, flush size) to write to disk.
+ *
+ * Note: sometime extra bytes are written due to minimum block size to complete the write
+ *       call.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+    // Used for aligning and returning to OS.
+    ByteBuffer originalByteBuffer;
+
+    // Keep JNA pointer ready to get new JNA pointer for every write position.
+    Pointer pointerToAlignedByteBuffer;
+
+    /**
+     * Constructs a new segment file.
+     *
+     * @param commitLog the commit log it will be used with.
+     */
+    DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
+    {
+        super(commitLog, manager);
+        // mark the initial sync marker as uninitialised
+        int firstSync = buffer.position();
+        buffer.putInt(firstSync + 0, 2);
+        buffer.putInt(firstSync + 4, 2);
+    }
+
+    ByteBuffer createBuffer(CommitLog commitLog)
+    {
+        try
+        {
+            int pageSize = MemoryUtil.pageSize();
+
+            originalByteBuffer = ByteBuffer.allocateDirect(DatabaseDescriptor.getCommitLogSegmentSize() + pageSize);
+            ByteBuffer alignedBuffer = NativeLibrary.getAlignedByteBuffer(originalByteBuffer, pageSize, DatabaseDescriptor.getCommitLogSegmentSize());
+            alignedBuffer.order(ByteOrder.BIG_ENDIAN);
+
+            pointerToAlignedByteBuffer = Native.getDirectBufferPointer(alignedBuffer);
+
+            manager.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
+	    // Set file size. It is required for some testcases to pass
+            NativeLibrary.tryFtruncate(fd, DatabaseDescriptor.getCommitLogSegmentSize());
+            return alignedBuffer;
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new FSWriteError(e, logFile);
+        }
+    }
+
+    @Override
+    void write(int startMarker, int nextMarker)
+    {
+        // if there's room in the discard section to write an empty header,
+        // zero out the next sync marker so replayer can cleanly exit
+        if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+        {
+            buffer.putInt(nextMarker, 0);
+            buffer.putInt(nextMarker + 4, 0);
+        }
+
+        // write previous sync marker to point to next sync marker
+        // we don't chain the crcs here to ensure this method is idempotent if it fails
+        writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
+            int alignment = DatabaseDescriptor.getDirectIOMinimumBlockAlignment();
+
+            // Align startMarker marker to block size.
+            boolean isHeadOffsetAligned = (lastSyncedOffset % alignment) == 0;
+            int writeFromFileOffset = isHeadOffsetAligned ? lastSyncedOffset : (lastSyncedOffset & ~(alignment - 1));
+
+            // Align nextMarker marker to block size.
+            boolean isTailOffsetAligned = ((nextMarker - writeFromFileOffset) % alignment) == 0 ;
+            final int maxBytes = DatabaseDescriptor.getNVMEDiskBlockSize();
+
+            Pointer byteBufferPtr = pointerToAlignedByteBuffer ;
+            long nbytesWritten = 0;
+
+	    int totalBytes = 0;
+
+	    // NVME/SSD prefers block size to get high write throughput.
+	    if (isTailOffsetAligned)
+		totalBytes = nextMarker - writeFromFileOffset;
+	    else
+	    {
+		totalBytes = ((nextMarker + alignment) & ~(alignment-1)) - writeFromFileOffset;
+	    }
+
+	    if (totalBytes <= 0 )
+		return;
+
+	    int currentOffset=writeFromFileOffset;
+	    int remainingBytes=totalBytes;
+
+	    // Align FD to start of block size.
+	    if (!isHeadOffsetAligned)
+                NativeLibrary.tryLSeekSet(fd, currentOffset);
+
+	    do
+	    {
+		int nBytes = maxBytes ;
+
+		// Last write may not have enough bytes to flush and lesser than necessary
+		// size may throw write errors. Include those bytes in this write only.
+		// Maximum 25% is considered.
+		double additionalWrite = remainingBytes / (double)maxBytes;
+		final double additionalMaxWrite = 1.25;
+
+		if (additionalWrite < additionalMaxWrite)
+		    nBytes=remainingBytes;
+
+		ByteBuffer byteBufferFromOffset = pointerToAlignedByteBuffer.getByteBuffer(currentOffset, nBytes);
+		Pointer curByteBufferPtr = Native.getDirectBufferPointer(byteBufferFromOffset);
+
+		nbytesWritten += NativeLibrary.tryWrite(fd, curByteBufferPtr, nBytes);
+
+		remainingBytes -= nBytes;
+		currentOffset += nBytes;
+
+	    } while(remainingBytes > 0);
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    @Override
+    public long onDiskSize()
+    {
+        return DatabaseDescriptor.getCommitLogSegmentSize();
+    }
+
+    @Override
+    protected void internalClose()
+    {
+        super.internalClose();
+        NativeLibrary.tryCloseFD(fd);
+        FileUtils.clean(originalByteBuffer);
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/commitlog/NonDirectIOSegment.java b/src/java/org/apache/cassandra/db/commitlog/NonDirectIOSegment.java
new file mode 100644
index 0000000000..c47683e2e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/NonDirectIOSegment.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteOrder;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.NativeLibrary;
+import org.apache.cassandra.io.util.FileUtils;
+
+import com.sun.jna.Native;
+import com.sun.jna.Pointer;
+
+/* This class is implemented for demo purpose. Can be changed based on feedback.
+ *
+ * Testing shows disk flush/write is slower with file backed memory maps vs non-file
+ * backed memory map. File is opened through JNA open and memory is allocated through
+ * allocateDirect with non-file backed.
+ *
+ * Based on testing following disk flush speed is observed for commitlog.
+ *  Direct-IO > Non Direct-IO through JNA > MSYNC
+ */
+public class NonDirectIOSegment extends CommitLogSegment
+{
+    Pointer pointerToAlignedByteBuffer;
+    /**
+     * Constructs a new segment file.
+     * interToAlignedByteBuffer
+     *
+     * @param commitLog the commit log it will be used with.
+     */
+    NonDirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
+    {
+        super(commitLog, manager);
+        // mark the initial sync marker as uninitialised
+        int firstSync = buffer.position();
+        buffer.putInt(firstSync + 0, 2);
+        buffer.putInt(firstSync + 4, 2);
+    }
+
+    ByteBuffer createBuffer(CommitLog commitLog)
+    {
+        try
+        {
+            int pageSize = MemoryUtil.pageSize();
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(DatabaseDescriptor.getCommitLogSegmentSize() + pageSize);
+
+            byteBuffer.order(ByteOrder.BIG_ENDIAN);
+            pointerToAlignedByteBuffer = Native.getDirectBufferPointer(byteBuffer);
+
+            manager.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
+	    // Set file size. It is necessary for some testcase to pass
+            NativeLibrary.tryFtruncate(fd, DatabaseDescriptor.getCommitLogSegmentSize());
+            return byteBuffer;
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new FSWriteError(e, logFile);
+        }
+    }
+
+    @Override
+    void write(int startMarker, int nextMarker)
+    {
+        // if there's room in the discard section to write an empty header,
+        // zero out the next sync marker so replayer can cleanly exit
+        if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+        {
+            buffer.putInt(nextMarker, 0);
+            buffer.putInt(nextMarker + 4, 0);
+        }
+
+        // write previous sync marker to point to next sync marker
+        // we don't chain the crcs here to ensure this method is idempotent if it fails
+        writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
+            int alignment = DatabaseDescriptor.getDirectIOMinimumBlockAlignment();
+
+            int writeFromFileOffset = lastSyncedOffset ;
+
+            final int maxBytes = DatabaseDescriptor.getNVMEDiskBlockSize();
+
+            Pointer byteBufferPtr = pointerToAlignedByteBuffer ;
+            long nbytesWritten = 0;
+
+	    int totalBytes = nextMarker - lastSyncedOffset;
+
+	    if (totalBytes <= 0 )
+		return;
+
+	    int currentOffset=writeFromFileOffset;
+	    int remainingBytes=totalBytes;
+
+	    //NVME/SSD prefers blocks size to write at high speed.
+
+	    do
+	    {
+		int nBytes = maxBytes ;
+
+		// Include now the remaining tails bytes of next write. Max 25% of block size.
+		double additionalWrite = remainingBytes / (double)maxBytes;
+		final double additionalMaxWrite = 1.25;
+
+		if (additionalWrite < additionalMaxWrite)
+		    nBytes=remainingBytes;
+
+		ByteBuffer byteBufferFromOffset = pointerToAlignedByteBuffer.getByteBuffer(currentOffset, nBytes);
+		Pointer curByteBufferPtr = Native.getDirectBufferPointer(byteBufferFromOffset);
+
+		nbytesWritten += NativeLibrary.tryWrite(fd, curByteBufferPtr, nBytes);
+
+		remainingBytes -= nBytes;
+		currentOffset += nBytes;
+
+	    } while(remainingBytes > 0);
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+        NativeLibrary.trySkipCache(fd, lastSyncedOffset, nextMarker, logFile.absolutePath());
+    }
+
+    @Override
+    public long onDiskSize()
+    {
+        return DatabaseDescriptor.getCommitLogSegmentSize();
+    }
+
+    @Override
+    protected void internalClose()
+    {
+        ByteBuffer usedBuffer = buffer;
+        super.internalClose();
+        NativeLibrary.tryCloseFD(fd);
+        FileUtils.clean(usedBuffer);
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/NativeLibrary.java b/src/java/org/apache/cassandra/utils/NativeLibrary.java
index 9348433939..d2a47dd5e7 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibrary.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibrary.java
@@ -21,6 +21,7 @@ import java.io.FileDescriptor;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.io.util.File;
@@ -38,6 +39,8 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.OS_NAME;
 import static org.apache.cassandra.utils.NativeLibrary.OSType.LINUX;
 import static org.apache.cassandra.utils.NativeLibrary.OSType.MAC;
 import static org.apache.cassandra.utils.NativeLibrary.OSType.AIX;
+import com.sun.jna.Pointer;
+import com.sun.jna.Native;
 
 public final class NativeLibrary
 {
@@ -62,8 +65,16 @@ public final class NativeLibrary
     private static final int F_GETFL   = 3;  /* get file status flags */
     private static final int F_SETFL   = 4;  /* set file status flags */
     private static final int F_NOCACHE = 48; /* Mac OS X specific flag, turns cache on/off */
+    private static final int O_DSYNC   = 010000; /* fcntl.h */
     private static final int O_DIRECT  = 040000; /* fcntl.h */
     private static final int O_RDONLY  = 00000000; /* fcntl.h */
+    private static final int O_WRONLY  = 00000001; /* fcntl.h */
+    private static final int O_RDWR    = 00000002; /* fcntl.h */
+    private static final int O_CREAT   = 00000100; /* fcntl.h */
+
+    private static final int SEEK_SET   = 0; /* unistd.h */
+    private static final int SEEK_CUR   = 1; /* unistd.h */
+    private static final int SEEK_END   = 2; /* unistd.h */
 
     private static final int POSIX_FADV_NORMAL     = 0; /* fadvise.h */
     private static final int POSIX_FADV_RANDOM     = 1; /* fadvise.h */
@@ -261,7 +272,6 @@ public final class NativeLibrary
         catch (UnsatisfiedLinkError e)
         {
             // if JNA is unavailable just skipping Direct I/O
-            // instance of this class will act like normal RandomAccessFile
         }
         catch (RuntimeException e)
         {
@@ -433,4 +443,170 @@ public final class NativeLibrary
 
         return -1;
     }
+
+    public static long tryLSeek(int fd, long offset, int whence)
+    {
+        long seek_offset=-1;
+        try
+        {
+            return wrappedLibrary.callLSeek(fd, offset, whence);
+        }
+        catch (UnsatisfiedLinkError e)
+        {
+            // JNA is unavailable just skipping Direct I/O
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e instanceof LastErrorException))
+                throw e;
+
+            if (REQUIRE)
+            {
+                String errMsg = String.format("lseek(%d) failed, errno (%d).", fd, errno(e));
+                logger.warn(errMsg);
+                throw new FSWriteError(e, errMsg);
+            }
+        }
+        return seek_offset;
+    }
+
+    public static long tryLSeekSet(int fd, long offset)
+    {
+      return tryLSeek(fd, offset, SEEK_SET);
+    }
+
+    public static long tryLSeekCurrent(int fd, long offset)
+    {
+      return tryLSeek(fd, offset, SEEK_CUR);
+    }
+
+    public static long tryLSeekEnd(int fd, long offset)
+    {
+      return tryLSeek(fd, offset, SEEK_END);
+    }
+
+    public static int tryOpenFile(String path, int openflags, boolean directIO)
+    {
+        int fd=-1;
+
+        if (osType == LINUX && directIO) {
+            openflags |= O_DIRECT | O_DSYNC;
+        }
+
+        try
+        {
+            fd = wrappedLibrary.callOpen(path, openflags, 0666);
+        }
+        catch (UnsatisfiedLinkError e)
+        {
+            // JNA is unavailable just skipping Direct I/O
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e instanceof LastErrorException))
+                throw e;
+
+            if (REQUIRE)
+                logger.warn("open({}, openflags={}) failed, errno ({}).", path, openflags, errno(e));
+        }
+
+        return fd;
+    }
+
+    public static int tryCreateFile(String path, boolean directIO)
+    {
+        int openflags = O_RDWR;
+
+        if (osType == LINUX) {
+          openflags |= O_CREAT;
+        }
+
+	return tryOpenFile(path, openflags, directIO);
+    }
+
+    // Debug functions and will be removed
+    public static Pointer getPointerToAlignedByteBuffer(ByteBuffer buffer, long align, long buffersize)
+    {
+        Pointer ptr = Native.getDirectBufferPointer(buffer);
+        long buf_addr = Pointer.nativeValue(ptr);
+        long buf_aligned_addr = buf_addr + align & ~(align-1);
+        long offset = buf_aligned_addr - buf_addr;
+        ByteBuffer alignedbuffer = ptr.getByteBuffer(offset, buffersize);
+        Pointer ptrToAlignedBuffer = Native.getDirectBufferPointer(alignedbuffer);
+        logger.info(" buffer = " + buffer + " addr = " + ptr.toString() + " bufsize,align = " + buffersize + "," + align);
+        logger.info(" buf_addr = " + Long.toHexString(buf_addr) + " buf_aligned_addr=" + Long.toHexString(buf_aligned_addr) + " offset = "  + offset);
+        logger.info(" alignedbuffer = " + alignedbuffer + " addr = " + Native.getDirectBufferPointer(alignedbuffer).toString());
+        return ptrToAlignedBuffer;
+    }
+
+    public static void tryPrintPtrAddress(ByteBuffer buffer)
+    {
+        Pointer ptr = Native.getDirectBufferPointer(buffer);
+        logger.info(" buffer = " + buffer + " ptr = " + ptr.toString());
+    }
+
+    // Function used to align the bytebuffer address to block size.
+    public static ByteBuffer getAlignedByteBuffer(ByteBuffer buffer, long align, long buffersize)
+    {
+        Pointer ptr = Native.getDirectBufferPointer(buffer);
+        long buf_addr = Pointer.nativeValue(ptr);
+        long buf_aligned_addr = buf_addr + align & ~(align-1);
+        long offset = buf_aligned_addr - buf_addr;
+        ByteBuffer alignedbuffer = ptr.getByteBuffer(offset, buffersize);
+        return alignedbuffer;
+    }
+
+    public static long tryWrite(int fd, Pointer ptrToBuffer, long count)
+    {
+        long write_status = 0;
+
+        try
+        {
+            write_status = wrappedLibrary.callWrite(fd, ptrToBuffer, count);
+        }
+        catch (UnsatisfiedLinkError e)
+        {
+            // JNA is unavailable just skipping Direct I/O
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e instanceof LastErrorException))
+                throw e;
+
+            if (REQUIRE)
+            {
+                String errMsg = String.format("write(%s) failed, errno (%s) %s ", fd, errno(e), e.getMessage());
+                logger.warn(errMsg);
+                throw new FSWriteError(e, errMsg);
+            }
+        }
+
+        return write_status;
+    }
+
+    public static long tryWrite(int fd, ByteBuffer buffer, long count)
+    {
+        Pointer ptr = Native.getDirectBufferPointer(buffer);
+        return tryWrite(fd, ptr, count);
+    }
+
+    public static int tryFtruncate(int fd, long offset)
+    {
+        int status = -1;
+        try
+        {
+            status = wrappedLibrary.callFtruncate(fd, offset);
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e instanceof LastErrorException))
+                throw e;
+
+            if (REQUIRE)
+            {
+               logger.warn("ftrucate(%s) failed, offset(%s),errno(%s), %s", fd, offset, errno(e));
+	    }
+        }
+        return status;
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java b/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
index c119311370..ca383953cd 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
@@ -74,10 +74,14 @@ public class NativeLibraryDarwin implements NativeLibraryWrapper
     private static native int munlockall() throws LastErrorException;
     private static native int fcntl(int fd, int command, long flags) throws LastErrorException;
     private static native int open(String path, int flags) throws LastErrorException;
+    private static native int open(String path, int flags, int mode) throws LastErrorException;
     private static native int fsync(int fd) throws LastErrorException;
     private static native int close(int fd) throws LastErrorException;
+    private static native long write(int fd, Pointer buf, long count) throws LastErrorException;
+    private static native long lseek(int fd, long offset, int whence) throws LastErrorException;
     private static native Pointer strerror(int errnum) throws LastErrorException;
     private static native long getpid() throws LastErrorException;
+    private static native int ftruncate(int fd, long offset) throws LastErrorException;
 
     public int callMlockall(int flags) throws UnsatisfiedLinkError, RuntimeException
     {
@@ -125,6 +129,26 @@ public class NativeLibraryDarwin implements NativeLibraryWrapper
         return getpid();
     }
 
+    public int callOpen(String path, int flags, int mode) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return open(path, flags, mode);
+    }
+
+    public long callLSeek(int fd, long offset, int whence) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return lseek(fd, offset, whence);
+    }
+
+    public long callWrite(int fd, Pointer buf, long count) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return write(fd, buf, count);
+    }
+
+    public int callFtruncate(int fd, long offset) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return ftruncate(fd, offset);
+    }
+
     public boolean isAvailable()
     {
         return available;
diff --git a/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java b/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
index 9c7bb3b73b..25db5806f6 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
@@ -75,10 +75,14 @@ public class NativeLibraryLinux implements NativeLibraryWrapper
     private static native int fcntl(int fd, int command, long flags) throws LastErrorException;
     private static native int posix_fadvise(int fd, long offset, int len, int flag) throws LastErrorException;
     private static native int open(String path, int flags) throws LastErrorException;
+    private static native int open(String path, int flags, int mode) throws LastErrorException;
     private static native int fsync(int fd) throws LastErrorException;
     private static native int close(int fd) throws LastErrorException;
+    private static native long write(int fd, Pointer buf, long count) throws LastErrorException;
+    private static native long lseek(int fd, long offset, int whence) throws LastErrorException;
     private static native Pointer strerror(int errnum) throws LastErrorException;
     private static native long getpid() throws LastErrorException;
+    private static native int ftruncate(int fd, long offset) throws LastErrorException;
 
     public int callMlockall(int flags) throws UnsatisfiedLinkError, RuntimeException
     {
@@ -125,6 +129,26 @@ public class NativeLibraryLinux implements NativeLibraryWrapper
         return getpid();
     }
 
+    public int callOpen(String path, int flags, int mode) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return open(path, flags, mode);
+    }
+
+    public long callLSeek(int fd, long offset, int whence) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return lseek(fd, offset, whence);
+    }
+
+    public long callWrite(int fd, Pointer buf, long count) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return write(fd, buf, count);
+    }
+
+    public int callFtruncate(int fd, long offset) throws UnsatisfiedLinkError, RuntimeException
+    {
+        return ftruncate(fd, offset);
+    }
+
     public boolean isAvailable()
     {
         return available;
diff --git a/src/java/org/apache/cassandra/utils/NativeLibraryWrapper.java b/src/java/org/apache/cassandra/utils/NativeLibraryWrapper.java
index 2c3d47fa16..95e2e720dd 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibraryWrapper.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibraryWrapper.java
@@ -38,8 +38,12 @@ public interface NativeLibraryWrapper
     int callFcntl(int fd, int command, long flags) throws UnsatisfiedLinkError, RuntimeException;
     int callPosixFadvise(int fd, long offset, int len, int flag) throws UnsatisfiedLinkError, RuntimeException;
     int callOpen(String path, int flags) throws UnsatisfiedLinkError, RuntimeException;
+    int callOpen(String path, int flags, int mode) throws UnsatisfiedLinkError, RuntimeException;
+    long callWrite(int fd, Pointer buf, long offset) throws UnsatisfiedLinkError, RuntimeException;
+    long callLSeek(int fd, long offset, int whence) throws UnsatisfiedLinkError, RuntimeException;
     int callFsync(int fd) throws UnsatisfiedLinkError, RuntimeException;
     int callClose(int fd) throws UnsatisfiedLinkError, RuntimeException;
     Pointer callStrerror(int errnum) throws UnsatisfiedLinkError, RuntimeException;
     long callGetpid() throws UnsatisfiedLinkError, RuntimeException;
+    int callFtruncate(int fd, long offset) throws UnsatisfiedLinkError, RuntimeException;
 }
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index f98207879c..7893ad3dea 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -9,6 +9,8 @@ commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size: 5MiB
 commitlog_directory: build/test/cassandra/commitlog
+use_jna_for_commitlog_io: false
+use_direct_io_for_commitlog: false
 # commitlog_compression:
 # - class_name: LZ4Compressor
 cdc_raw_directory: build/test/cassandra/cdc_raw
